[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon May 9 17:57:27 CDT 2011
branch "threading" has been updated
via 2e022453c47afd990d1f27e4403e813a45060e84 (commit)
via 9f7b96df9ee33cb9469d21446a7cdde44c3ccb7a (commit)
via 369e38a8e535a447a56e913dd3da63d857bc1ab0 (commit)
via 1f9e8d9ff5e1fa75e0cd9f6be7699efc400dc796 (commit)
via 556bf54a532d466b1b77c5872d01de211386ef15 (commit)
via bfbb0f9d2ced1b1ec12b1cf4c6b57625451bc47c (commit)
via 8dd3dc0e1a319ae29aa0cfd7eb67955d26d79241 (commit)
via b2c9c8642fbde0022dd790f0a7fc4ccaa58071b1 (commit)
via 76872d653fe255eb38335206d55eee31ad94f0c6 (commit)
via ee47a2ef8ac50751307755c71010d5c4b91effcc (commit)
via f268b4a8686fa136d4b6a4894d7232a439cf0b6f (commit)
via b917d36d4d91e0069678882fa6785a6ff78ce677 (commit)
via d395576d5663b58be0b3ac74609e735b5fa858d5 (commit)
via 13f04bb0a3bc4098b7cc9b1a5f394e2876376292 (commit)
from 5620383d390adef3a55d7a81234f1e1a8ff03430 (commit)
Summary of changes:
src/PJSipSessionModule.cpp | 130 +++++--
src/PJSipSessionModule.h | 45 +--
src/SipEndpoint.cpp | 9 +-
src/SipSession.cpp | 622 ++++++++++++++++++++----------
src/SipSession.h | 89 ++++-
src/SipSessionManagerEndpointLocator.cpp | 16 +-
src/SipSessionManagerEndpointLocator.h | 3 +-
7 files changed, 622 insertions(+), 292 deletions(-)
- Log -----------------------------------------------------------------
commit 2e022453c47afd990d1f27e4403e813a45060e84
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon May 9 17:57:10 2011 -0500
Start making more functions queueable and use AMD.
Definitely incomplete, but just need to push what I have.
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index ba1f48c..caf5169 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -245,10 +245,12 @@ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr SipSession::addListener(
class IndicateOperation : public SuspendableWork
{
public:
- IndicateOperation(const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
+ IndicateOperation(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
const boost::shared_ptr<SipSessionPriv>& sessionPriv,
const SipSessionPtr& session)
- : mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
+ : mCb(cb), mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
@@ -298,9 +300,11 @@ public:
pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
}
+ mCb->ice_response();
return Complete;
}
+ AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr mCb;
AsteriskSCF::SessionCommunications::V1::IndicationPtr mIndication;
boost::shared_ptr<SipSessionPriv> mImplPriv;
SipSessionPtr mSession;
@@ -309,90 +313,153 @@ public:
/**
* An implementation of the indicate method as defined in SessionCommunications.ice
*/
-void SipSession::indicate(const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication, const Ice::Current&)
+void SipSession::indicate_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
+ const Ice::Current&)
{
- SessionWorkPtr sessionWork = getSessionWork();
-
- sessionWork->enqueueWork(new IndicateOperation(indication, mImplPriv, this));
+ enqueueSessionWork(new IndicateOperation(cb, indication, mImplPriv, this));
}
+class GetEndpointOperation : public SuspendableWork
+{
+public:
+ GetEndpointOperation(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getEndpointPtr& cb,
+ const boost::shared_ptr<SipSessionPriv> sessionPriv)
+ : mCb(cb), mImplPriv(sessionPriv) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ mCb->ice_response(mImplPriv->mEndpoint->getEndpointProxy());
+ }
+};
+
/**
* An implementation of the getEndpoint method as defined in SessionCommunications.ice
*/
-AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx SipSession::getEndpoint(const Ice::Current&)
+void SipSession::getEndpoint_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getEndpointPtr& cb,
+ const Ice::Current&)
{
- return mImplPriv->mEndpoint->getEndpointProxy();
+ enqueueSessionWork(new GetEndpointOperation(cb, mImplPriv));
}
+class GetInfoOperation : public SuspendableWork
+{
+public:
+ //XXX This will likely need to be modified to take more parameters once we're
+ //actually returning actual info.
+ GetInfoOperation(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getInfoPtr& cb)
+ : mCb(cb) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr sessioninfo =
+ new AsteriskSCF::SessionCommunications::V1::SessionInfo();
+
+ /* TODO: Utilize locking so this becomes safe
+ if (!mImplPriv->mInviteSession || mImplPriv->mInviteSession->state == PJSIP_INV_STATE_NULL)
+ {
+ sessioninfo->currentState = "ready";
+ }
+ else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_CALLING)
+ {
+ sessioninfo->currentState = "outbound";
+ }
+ else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_INCOMING)
+ {
+ sessioninfo->currentState = "inbound";
+ }
+ else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_EARLY)
+ {
+ sessioninfo->currentState = "early";
+ }
+ else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_CONNECTING)
+ {
+ sessioninfo->currentState = "connecting";
+ }
+ else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_CONFIRMED)
+ {
+ sessioninfo->currentState = "connected";
+ }
+ else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_DISCONNECTED)
+ {
+ sessioninfo->currentState = "disconnected";
+ }
+ */
+
+ mCb->ice_response(sessionInfo);
+ return Complete;
+ }
+
+ AsteriskSCF::SessionCommunications::V1::AMD_Session_getInfoPtr mCb;
+};
+
/**
* An implementation of the getInfo method as defined in SessionCommunications.ice
*/
-AsteriskSCF::SessionCommunications::V1::SessionInfoPtr SipSession::getInfo(const Ice::Current&)
-{
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr sessioninfo =
- new AsteriskSCF::SessionCommunications::V1::SessionInfo();
-
- /* TODO: Utilize locking so this becomes safe
- if (!mImplPriv->mInviteSession || mImplPriv->mInviteSession->state == PJSIP_INV_STATE_NULL)
- {
- sessioninfo->currentState = "ready";
- }
- else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_CALLING)
- {
- sessioninfo->currentState = "outbound";
- }
- else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_INCOMING)
- {
- sessioninfo->currentState = "inbound";
- }
- else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_EARLY)
- {
- sessioninfo->currentState = "early";
- }
- else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_CONNECTING)
- {
- sessioninfo->currentState = "connecting";
- }
- else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_CONFIRMED)
- {
- sessioninfo->currentState = "connected";
- }
- else if (mImplPriv->mInviteSession->state == PJSIP_INV_STATE_DISCONNECTED)
- {
- sessioninfo->currentState = "disconnected";
- }
- */
-
- return sessioninfo;
+void SipSession::getInfo_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getInfoPtr& cb,
+ const Ice::Current&)
+{
+ enqueueSessionWork(new GetInfoOperation(cb));
}
+class GetMediaSessionOperation : public SuspendableWork
+{
+public:
+ //XXX PICK UP HERE TUESDAY
+};
+
/**
* An implementation of the connect method as defined in SessionCommunications.ice
*/
-AsteriskSCF::Media::V1::SessionPrx SipSession::getMediaSession(const Ice::Current&)
+void SipSession::getMediaSession_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getMediaSessionPtr& cb,
+ const Ice::Current&)
{
return mImplPriv->mMediaSessionProxy;
}
-/**
- * An implementation of the getBridge method as defined in SessionCommunications.ice
- */
-AsteriskSCF::SessionCommunications::V1::BridgePrx SipSession::getBridge(const Ice::Current&)
+class GetBridgeOperation : public SuspendableWork
{
- boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+ GetBridgeOperation(const AsteriskSCF::SessionCommunications::V1::AMD_Session_getBridgedPtr& cb,
+ const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+ : mCb(cb), mImplPriv(sessionPriv) { }
- if (mImplPriv->mBridge == 0)
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
- throw AsteriskSCF::SessionCommunications::V1::NotBridged();
+ if (mImplPriv->mBridge == 0)
+ {
+ mCb->ice_exception(AsteriskSCF::SessionCommunications::V1::NotBridged());
+ return Complete;
+ }
+
+ mCb->ice_response(mImplPriv->mBridge);
+ return Complete;
}
+
+ AsteriskSCF::SessionCommunications::V1::AMD_Session_getBridgedPtr mCb;
+ boost::shared_ptr<SipSessionPriv> mImplPriv;
+};
- return mImplPriv->mBridge;
+/**
+ * An implementation of the getBridge method as defined in SessionCommunications.ice
+ */
+void SipSession::getBridge_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getBridgePtr& cb,
+ const Ice::Current&)
+{
+ enqueueSessionWork(new GetBridgeOperation(cb, mImplPriv));
}
class SetBridgeOperation : public SuspendableWork
{
public:
- SetBridgeOperation(const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
+ SetBridgeOperation(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
const SipSessionPtr& session,
const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
@@ -407,7 +474,6 @@ public:
return Complete;
}
-private:
AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr mAMDCb;
SipSessionPtr mSession;
AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
@@ -424,9 +490,7 @@ void SipSession::setBridge_async(
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const Ice::Current& current)
{
- SessionWorkPtr sessionWork = getSessionWork();
-
- sessionWork->enqueueWork(new SetBridgeOperation(cb, this, bridge, listener, current));
+ enqueueSessionWork(new SetBridgeOperation(cb, this, bridge, listener, current));
}
/**
@@ -438,20 +502,45 @@ void SipSession::setBridge(const AsteriskSCF::SessionCommunications::V1::BridgeP
mImplPriv->mBridge = bridge;
}
-/**
- * An implementation of the removeBridge method as defined in SessionCommunications.ice
- */
-void SipSession::removeBridge(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
- const Ice::Current& current)
+class RemoveBridgeOperation : public SuspendableWork
{
- if (mImplPriv->mBridge == 0)
+public:
+ RemoveBridgeOperation(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_removeBridgePtr& cb,
+ const boost::shared_ptr<SipSessionPriv>& sessionPriv,
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current& current)
+ : mCb(cb), mImplPriv(sessionPriv), mListener(listener), mCurrent(current) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
- throw AsteriskSCF::SessionCommunications::V1::NotBridged();
+ if (mImplPriv->mBridge == 0)
+ {
+ mCb->ice_exception(AsteriskSCF::SessionCommunications::V1::NotBridged());
+ return Complete;
+ }
+
+ setBridge(0);
+
+ //XXX REMEMBER TO MAKE A LOCAL VERSION OF removeListener!
+ removeListener(listener, current);
}
- setBridge(0);
+ AsteriskSCF::SessionCommunications::V1::AMD_Session_removeBridgePtr cb;
+ boost::shared_ptr<SipSessionPriv> sessionPriv;
+ AsteriskSCF::SessionCommunications::V1::SessionListenerPrx listener;
+ Ice::Current current;
+};
- removeListener(listener, current);
+/**
+ * An implementation of the removeBridge method as defined in SessionCommunications.ice
+ */
+void SipSession::removeBridge_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_removeBridgePtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current& current)
+{
+ enqueueSessionWork(new RemoveBridgeOperation(cb, mImplPriv, listener, current));
}
/**
@@ -560,9 +649,7 @@ public:
*/
void SipSession::start(const Ice::Current&)
{
- SessionWorkPtr sessionWork = getSessionWork();
-
- sessionWork->enqueueWork(new StartOperation(this, mImplPriv));
+ enqueueSessionWork(new StartOperation(this, mImplPriv));
}
class StopOperation : public SuspendableWork
@@ -606,9 +693,7 @@ public:
*/
void SipSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
{
- SessionWorkPtr sessionWork = getSessionWork();
-
- sessionWork->enqueueWork(new StopOperation(response, mImplPriv->mInviteSession));
+ enqueueSessionWork(new StopOperation(response, mImplPriv->mInviteSession));
}
class DestroyOperation : public SuspendableWork
@@ -655,9 +740,7 @@ public:
*/
void SipSession::destroy()
{
- SessionWorkPtr sessionWork = getSessionWork();
-
- sessionWork->enqueueWork(new DestroyOperation(this, mImplPriv));
+ enqueueSessionWork(new DestroyOperation(this, mImplPriv));
}
/**
@@ -881,5 +964,12 @@ void SipSession::setSessionWork(const SessionWorkPtr& sessionWork)
{
mImplPriv->mSessionWork = sessionWork;
}
+
+void SipSession::enqueueSessionWork(const SuspendableWorkPtr& task)
+{
+ SessionWorkPtr sessionWork = getSessionWork();
+
+ sessionWork->enqueueWork(task);
+}
}; // end SipSessionManager
}; // end AsteriskSCF
diff --git a/src/SipSession.h b/src/SipSession.h
index ab38280..8d608ef 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -111,21 +111,47 @@ public:
/**
* Interface specific functions.
*/
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr
- addListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
- void indicate(const AsteriskSCF::SessionCommunications::V1::IndicationPtr&, const Ice::Current&);
+ void addListener_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_addListenerPtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&,
+ const Ice::Current&);
+
+ void indicate_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::IndicationPtr&, const Ice::Current&);
+
void connect(const Ice::Current&);
+
void flash(const Ice::Current&);
- AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx getEndpoint(const Ice::Current&);
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr getInfo(const Ice::Current&);
- AsteriskSCF::Media::V1::SessionPrx getMediaSession(const Ice::Current&);
- AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge(const Ice::Current&);
+
+ void getEndpoint_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getEndpointPtr& cb,
+ const Ice::Current&);
+
+ void getInfo_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getInfoPtr& cb,
+ const Ice::Current&);
+
+ void getMediaSession_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_getMediaSessionPtr& cb,
+ const Ice::Current&);
+
+ void getBridge_async(
+ const AsteriskSCF::SessionCommunications::V1:AMD_Session_getBridgePtr& cb,
+ const Ice::Current&);
+
void setBridge_async(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
const AsteriskSCF::SessionCommunications::V1::BridgePrx&,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
+
void setBridge(const AsteriskSCF::SessionCommunications::V1::BridgePrx&);
- void removeBridge(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
+
+ void removeBridge_async(
+ const AsteriskSCF::SessionCommunications::V1::Session_removeBridgePtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&,
+ const Ice::Current&);
+
void hold(const Ice::Current&);
void progress(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&);
void removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
commit 9f7b96df9ee33cb9469d21446a7cdde44c3ccb7a
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon May 9 16:33:09 2011 -0500
Make setBridge queueable and make it use AMD.
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index b9add79..ba1f48c 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -389,17 +389,44 @@ AsteriskSCF::SessionCommunications::V1::BridgePrx SipSession::getBridge(const Ic
return mImplPriv->mBridge;
}
+class SetBridgeOperation : public SuspendableWork
+{
+public:
+ SetBridgeOperation(const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
+ const SipSessionPtr& session,
+ const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge,
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current& current)
+ : mAMDCb(cb), mSession(session), mBridge(bridge), mListener(listener), mCurrent(current) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ mSession->setBridge(mBridge);
+ mSession->addListener(mListener, mCurrent);
+ mAMDCb->ice_response(mSession->getInfo(mCurrent));
+ return Complete;
+ }
+
+private:
+ AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr mAMDCb;
+ SipSessionPtr mSession;
+ AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
+ AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListener;
+ Ice::Current mCurrent;
+};
+
/**
* An implementation of the setBridge method as defined in SessionCommunications.ice
*/
-AsteriskSCF::SessionCommunications::V1::SessionInfoPtr SipSession::setBridge(
+void SipSession::setBridge_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const Ice::Current& current)
{
- setBridge(bridge);
- addListener(listener, current);
- return getInfo(current);
+ SessionWorkPtr sessionWork = getSessionWork();
+
+ sessionWork->enqueueWork(new SetBridgeOperation(cb, this, bridge, listener, current));
}
/**
diff --git a/src/SipSession.h b/src/SipSession.h
index 8b63c5e..ab38280 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -120,7 +120,8 @@ public:
AsteriskSCF::SessionCommunications::V1::SessionInfoPtr getInfo(const Ice::Current&);
AsteriskSCF::Media::V1::SessionPrx getMediaSession(const Ice::Current&);
AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge(const Ice::Current&);
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr setBridge(
+ void setBridge_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
const AsteriskSCF::SessionCommunications::V1::BridgePrx&,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
void setBridge(const AsteriskSCF::SessionCommunications::V1::BridgePrx&);
commit 369e38a8e535a447a56e913dd3da63d857bc1ab0
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon May 9 15:20:12 2011 -0500
Make start() queueable.
I think this may take care of everything in SipSession.cpp...I'll give it
another look-see.
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index c80c320..b9add79 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -446,78 +446,96 @@ void SipSession::removeListener(const AsteriskSCF::SessionCommunications::V1::Se
}
}
-/**
- * An implementation of the start method as defined in SessionCommunications.ice which sends
- * an INVITE with SDP to the SIP endpoint.
- */
-void SipSession::start(const Ice::Current&)
+class StartOperation : public SuspendableWork
{
- pj_str_t local_uri, remote_uri;
- pjsip_dialog *dialog;
- SipEndpointConfig &config = mImplPriv->mEndpoint->getConfig();
-
- char local[64];
- pj_ansi_sprintf(local, "sip:%s", config.sessionConfig.sourceAddress.c_str());
- local_uri = pj_str(local);
-
- char remote[64];
- bool userDefined = mImplPriv->mDestination.size() != 0;
- pj_ansi_sprintf(remote, "sip:%s%s%s",
- userDefined ? mImplPriv->mDestination.c_str() : "",
- userDefined ? "@" : "",
- config.transportConfig.address.c_str());
- remote_uri = pj_str(remote);
- lg(Debug) << "Sending new SIP INVITE to " << remote;
-
- // Create a UAC dialog for the outgoing call
- if ((pjsip_dlg_create_uac(pjsip_ua_instance(), &local_uri, &local_uri, &remote_uri, &remote_uri, &dialog)) !=
- PJ_SUCCESS)
+public:
+ StartOperation(const SipSessionPtr& session, const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+ : mSession(session), mImplPriv(sessionPriv) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
- // What should we do here? Throw an exception?
- return;
- }
+ pj_str_t local_uri, remote_uri;
+ pjsip_dialog *dialog;
+ SipEndpointConfig &config = mImplPriv->mEndpoint->getConfig();
+
+ char local[64];
+ pj_ansi_sprintf(local, "sip:%s", config.sessionConfig.sourceAddress.c_str());
+ local_uri = pj_str(local);
+
+ char remote[64];
+ bool userDefined = mImplPriv->mDestination.size() != 0;
+ pj_ansi_sprintf(remote, "sip:%s%s%s",
+ userDefined ? mImplPriv->mDestination.c_str() : "",
+ userDefined ? "@" : "",
+ config.transportConfig.address.c_str());
+ remote_uri = pj_str(remote);
+ lg(Debug) << "Sending new SIP INVITE to " << remote;
+
+ // Create a UAC dialog for the outgoing call
+ if ((pjsip_dlg_create_uac(pjsip_ua_instance(), &local_uri, &local_uri, &remote_uri, &remote_uri, &dialog)) !=
+ PJ_SUCCESS)
+ {
+ // What should we do here? Throw an exception?
+ return Complete;
+ }
- // Since the SDP generation requires a pool we use the dialog one, so it has to be set here
- mImplPriv->mDialog = dialog;
+ // Since the SDP generation requires a pool we use the dialog one, so it has to be set here
+ mImplPriv->mDialog = dialog;
- pjmedia_sdp_session *sdp = createSDPOffer();
+ pjmedia_sdp_session *sdp = mSession->createSDPOffer();
- // Create an INVITE session
- pjsip_inv_session *inviteSession;
- if ((pjsip_inv_create_uac(dialog, sdp, 0, &inviteSession)) != PJ_SUCCESS)
- {
- pjsip_dlg_terminate(dialog);
- // What should we do here? Throw an exception?
- return;
- }
+ // Create an INVITE session
+ pjsip_inv_session *inviteSession;
+ if ((pjsip_inv_create_uac(dialog, sdp, 0, &inviteSession)) != PJ_SUCCESS)
+ {
+ pjsip_dlg_terminate(dialog);
+ // What should we do here? Throw an exception?
+ return Complete;
+ }
- pjsip_dlg_add_usage(dialog, &mImplPriv->mManager->getSessionModule()->getModule(), NULL);
+ pjsip_dlg_add_usage(dialog, &mImplPriv->mManager->getSessionModule()->getModule(), NULL);
- pjsip_timer_setting session_timer_settings;
- pjsip_timer_setting_default(&session_timer_settings);
- pjsip_timer_init_session(inviteSession, &session_timer_settings);
+ pjsip_timer_setting session_timer_settings;
+ pjsip_timer_setting_default(&session_timer_settings);
+ pjsip_timer_init_session(inviteSession, &session_timer_settings);
- // Record our session within the dialog so code handling pjsip events can do STUFF
- SipSessionPtr session = new SipSession(*this);
+ // Record our session within the dialog so code handling pjsip events can do STUFF
+ SipSessionPtr session = new SipSession(*mSession);
- PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inviteSession, session);
- inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
+ PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inviteSession, session);
+ inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
- // Create the actual INVITE packet
- pjsip_tx_data *packet;
- if ((pjsip_inv_invite(inviteSession, &packet)) != PJ_SUCCESS)
- {
- pjsip_inv_terminate(inviteSession, 500, 0);
- pjsip_dlg_terminate(dialog);
- // What should we do here? Throw an exception?
- return;
+ // Create the actual INVITE packet
+ pjsip_tx_data *packet;
+ if ((pjsip_inv_invite(inviteSession, &packet)) != PJ_SUCCESS)
+ {
+ pjsip_inv_terminate(inviteSession, 500, 0);
+ pjsip_dlg_terminate(dialog);
+ // What should we do here? Throw an exception?
+ return Complete;
+ }
+
+ // Before we send the message we probably should populate the endpoint data... just in case
+ mImplPriv->mInviteSession = inviteSession;
+
+ // Boom! Houston, we have transmission.
+ pjsip_inv_send_msg(inviteSession, packet);
+ return Complete;
}
- // Before we send the message we probably should populate the endpoint data... just in case
- mImplPriv->mInviteSession = inviteSession;
+ SipSessionPtr mSession;
+ boost::shared_ptr<SipSessionPriv> mImplPriv;
+};
+
+/**
+ * An implementation of the start method as defined in SessionCommunications.ice which sends
+ * an INVITE with SDP to the SIP endpoint.
+ */
+void SipSession::start(const Ice::Current&)
+{
+ SessionWorkPtr sessionWork = getSessionWork();
- // Boom! Houston, we have transmission.
- pjsip_inv_send_msg(inviteSession, packet);
+ sessionWork->enqueueWork(new StartOperation(this, mImplPriv));
}
class StopOperation : public SuspendableWork
commit 1f9e8d9ff5e1fa75e0cd9f6be7699efc400dc796
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon May 9 14:54:02 2011 -0500
Place the SessionWork on the session as part of the construction.
This paves the way to allow for the start() call to be queuable.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index bf7b941..d2aa0a8 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -622,10 +622,6 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
pjsip_inv_send_msg(inv_session, tdata);
return;
}
-
- SessionWorkPtr sessionWork(new SessionWork(mPoolQueue));
- session->setSessionWork(sessionWork);
-
session->setInviteSession(inv_session);
session->setDialog(dlg);
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inv_session, session);
diff --git a/src/SipEndpoint.cpp b/src/SipEndpoint.cpp
index 50d740e..1aecdc5 100644
--- a/src/SipEndpoint.cpp
+++ b/src/SipEndpoint.cpp
@@ -266,16 +266,18 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(co
return 0;
}
+ SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, listener, mImplPriv->mManager,
- mImplPriv->mServiceLocator, mImplPriv->mReplica);
+ mImplPriv->mServiceLocator, mImplPriv->mReplica, sessionWork);
mImplPriv->mSessions.push_back(session);
return session->getSessionProxy();
}
AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination)
{
+ SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, 0, mImplPriv->mManager,
- mImplPriv->mServiceLocator, mImplPriv->mReplica);
+ mImplPriv->mServiceLocator, mImplPriv->mReplica, sessionWork);
mImplPriv->mSessions.push_back(session);
return session;
}
@@ -285,8 +287,9 @@ AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const s
const AsteriskSCF::Media::V1::SessionPrx& mediasession, const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
{
+ SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, sessionid, mediaid, mediasession,
- sources, sinks, mImplPriv->mManager, mImplPriv->mServiceLocator, mImplPriv->mReplica);
+ sources, sinks, mImplPriv->mManager, mImplPriv->mServiceLocator, mImplPriv->mReplica, sessionWork);
mImplPriv->mSessions.push_back(session);
return session;
}
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index a269f60..c80c320 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -86,9 +86,10 @@ public:
SipSessionPriv(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
const std::string& destination, PJSipManager *manager,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica)
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ const SessionWorkPtr& sessionWork)
: mAdapter(adapter), mDialog(0), mInviteSession(0), mEndpoint(endpoint), mDestination(destination),
- mManager(manager), mServiceLocator(serviceLocator), mReplica(replica) { };
+ mManager(manager), mServiceLocator(serviceLocator), mReplica(replica), mSessionWork(sessionWork) { };
/**
* An instance of a media session.
@@ -178,8 +179,8 @@ public:
SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
const std::string& destination, const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica)
- : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica))
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, const SessionWorkPtr& sessionWork)
+ : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica, sessionWork))
{
if (listener != 0)
{
@@ -206,8 +207,8 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
const Ice::Identity& mediaid, const AsteriskSCF::Media::V1::SessionPrx& mediasession,
const AsteriskSCF::Media::V1::StreamSourceSeq& sources, const AsteriskSCF::Media::V1::StreamSinkSeq& sinks,
PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica)
- : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica))
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, const SessionWorkPtr& sessionWork)
+ : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica, sessionWork))
{
mImplPriv->mSessionProxy =
AsteriskSCF::SessionCommunications::V1::SessionPrx::uncheckedCast(adapter->add(this, sessionid));
@@ -499,9 +500,6 @@ void SipSession::start(const Ice::Current&)
// Record our session within the dialog so code handling pjsip events can do STUFF
SipSessionPtr session = new SipSession(*this);
- SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
- session->setSessionWork(sessionWork);
-
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inviteSession, session);
inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
diff --git a/src/SipSession.h b/src/SipSession.h
index ab46d76..8b63c5e 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -96,13 +96,15 @@ public:
SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, PJSipManager *manager,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica);
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ const SessionWorkPtr& sessionWork);
SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&, const Ice::Identity&,
const Ice::Identity&, const AsteriskSCF::Media::V1::SessionPrx&,
const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&,
PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica);
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ const SessionWorkPtr& sessionWork);
bool operator==(const SipSession &other) const;
commit 556bf54a532d466b1b77c5872d01de211386ef15
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon May 9 14:26:48 2011 -0500
Add an IndicateOperation.
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 3c69bc4..a269f60 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -241,56 +241,78 @@ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr SipSession::addListener(
return getInfo(current);
}
+class IndicateOperation : public SuspendableWork
+{
+public:
+ IndicateOperation(const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
+ const boost::shared_ptr<SipSessionPriv>& sessionPriv,
+ const SipSessionPtr& session)
+ : mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ AsteriskSCF::SessionCommunications::V1::ConnectIndicationPtr Connect;
+ AsteriskSCF::SessionCommunications::V1::FlashIndicationPtr Flash;
+ AsteriskSCF::SessionCommunications::V1::HoldIndicationPtr Hold;
+ AsteriskSCF::SessionCommunications::V1::ProgressIndicationPtr Progress;
+ AsteriskSCF::SessionCommunications::V1::RingIndicationPtr Ring;
+ AsteriskSCF::SessionCommunications::V1::UnholdIndicationPtr Unhold;
+ pjsip_tx_data *packet = NULL;
+ pj_status_t status = -1;
+
+ if ((Connect = AsteriskSCF::SessionCommunications::V1::ConnectIndicationPtr::dynamicCast(mIndication)))
+ {
+ pjmedia_sdp_session *sdp = mSession->createSDPOffer();
+ status = pjsip_inv_answer(mImplPriv->mInviteSession, 200, NULL, sdp, &packet);
+ }
+ else if ((Flash = AsteriskSCF::SessionCommunications::V1::FlashIndicationPtr::dynamicCast(mIndication)))
+ {
+ // This is usually transported using INFO or RFC2833, so for now just pretend it does not exist
+ }
+ else if ((Hold = AsteriskSCF::SessionCommunications::V1::HoldIndicationPtr::dynamicCast(mIndication)))
+ {
+ // TODO: Update SDP with sendonly attribute and no IP
+ // TODO: This is actually passing the hold through, we will need to support local generation
+ status = pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, NULL, &packet);
+ }
+ else if ((Progress = AsteriskSCF::SessionCommunications::V1::ProgressIndicationPtr::dynamicCast(mIndication)))
+ {
+ pjmedia_sdp_session *sdp = mSession->createSDPOffer();
+ status = pjsip_inv_answer(mImplPriv->mInviteSession, 183, NULL, sdp, &packet);
+ }
+ else if ((Ring = AsteriskSCF::SessionCommunications::V1::RingIndicationPtr::dynamicCast(mIndication)))
+ {
+ status = pjsip_inv_answer(mImplPriv->mInviteSession, 180, NULL, NULL, &packet);
+ }
+ else if ((Unhold = AsteriskSCF::SessionCommunications::V1::UnholdIndicationPtr::dynamicCast(mIndication)))
+ {
+ // TODO: Update SDP with sendrecv and IP
+ // TODO: This is actually passing the unhold through, we will need to support local generation
+ status = pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, NULL, &packet);
+ }
+
+ // If the indication produced a packet send it out
+ if (status == PJ_SUCCESS)
+ {
+ pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+ }
+
+ return Complete;
+ }
+
+ AsteriskSCF::SessionCommunications::V1::IndicationPtr mIndication;
+ boost::shared_ptr<SipSessionPriv> mImplPriv;
+ SipSessionPtr mSession;
+};
+
/**
* An implementation of the indicate method as defined in SessionCommunications.ice
*/
void SipSession::indicate(const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication, const Ice::Current&)
{
- AsteriskSCF::SessionCommunications::V1::ConnectIndicationPtr Connect;
- AsteriskSCF::SessionCommunications::V1::FlashIndicationPtr Flash;
- AsteriskSCF::SessionCommunications::V1::HoldIndicationPtr Hold;
- AsteriskSCF::SessionCommunications::V1::ProgressIndicationPtr Progress;
- AsteriskSCF::SessionCommunications::V1::RingIndicationPtr Ring;
- AsteriskSCF::SessionCommunications::V1::UnholdIndicationPtr Unhold;
- pjsip_tx_data *packet = NULL;
- pj_status_t status = -1;
-
- if ((Connect = AsteriskSCF::SessionCommunications::V1::ConnectIndicationPtr::dynamicCast(indication)))
- {
- pjmedia_sdp_session *sdp = createSDPOffer();
- status = pjsip_inv_answer(mImplPriv->mInviteSession, 200, NULL, sdp, &packet);
- }
- else if ((Flash = AsteriskSCF::SessionCommunications::V1::FlashIndicationPtr::dynamicCast(indication)))
- {
- // This is usually transported using INFO or RFC2833, so for now just pretend it does not exist
- }
- else if ((Hold = AsteriskSCF::SessionCommunications::V1::HoldIndicationPtr::dynamicCast(indication)))
- {
- // TODO: Update SDP with sendonly attribute and no IP
- // TODO: This is actually passing the hold through, we will need to support local generation
- status = pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, NULL, &packet);
- }
- else if ((Progress = AsteriskSCF::SessionCommunications::V1::ProgressIndicationPtr::dynamicCast(indication)))
- {
- pjmedia_sdp_session *sdp = createSDPOffer();
- status = pjsip_inv_answer(mImplPriv->mInviteSession, 183, NULL, sdp, &packet);
- }
- else if ((Ring = AsteriskSCF::SessionCommunications::V1::RingIndicationPtr::dynamicCast(indication)))
- {
- status = pjsip_inv_answer(mImplPriv->mInviteSession, 180, NULL, NULL, &packet);
- }
- else if ((Unhold = AsteriskSCF::SessionCommunications::V1::UnholdIndicationPtr::dynamicCast(indication)))
- {
- // TODO: Update SDP with sendrecv and IP
- // TODO: This is actually passing the unhold through, we will need to support local generation
- status = pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, NULL, &packet);
- }
+ SessionWorkPtr sessionWork = getSessionWork();
- // If the indication produced a packet send it out
- if (status == PJ_SUCCESS)
- {
- pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
- }
+ sessionWork->enqueueWork(new IndicateOperation(indication, mImplPriv, this));
}
/**
commit bfbb0f9d2ced1b1ec12b1cf4c6b57625451bc47c
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon May 9 13:01:51 2011 -0500
Rearrange structures in a more intelligent way.
Before, the SessionWork was contained in the PJSipSessionModInfo. Now,
the SessionWork is part of the SipSession. It makes so much more sense
this way.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 5ee5902..bf7b941 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -290,16 +290,6 @@ void PJSipSessionModInfo::setSessionPtr(const SipSessionPtr& sessionPtr)
mSession = sessionPtr;
}
-SessionWorkPtr PJSipSessionModInfo::getSessionWork()
-{
- return mSessionWork;
-}
-
-void PJSipSessionModInfo::setSessionWork(const SessionWorkPtr& sessionWork)
-{
- mSessionWork = sessionWork;
-}
-
InviteSessionState PJSipSessionModInfo::inviteStateTranslate(pjsip_inv_state state)
{
InviteSessionState retState;
@@ -633,12 +623,12 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
return;
}
- SessionWorkPtr sessionWork(new SessionWork(mPoolQueue, session));
+ SessionWorkPtr sessionWork(new SessionWork(mPoolQueue));
+ session->setSessionWork(sessionWork);
session->setInviteSession(inv_session);
session->setDialog(dlg);
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inv_session, session);
- session_mod_info->setSessionWork(sessionWork);
dlg_mod_info->mDialogState->mSessionId = session_mod_info->mSessionState->mSessionId;
tsx_mod_info->mTransactionState->mSessionId = session_mod_info->mSessionState->mSessionId;
@@ -1333,8 +1323,8 @@ void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)
pool->setSize(0);
}
-SessionWork::SessionWork(const QueuePtr& queue, const SipSessionPtr& session)
- : mThreadPoolQueue(queue), mSession(session),
+SessionWork::SessionWork(const QueuePtr& queue)
+ : mThreadPoolQueue(queue),
mInternalQueue(new SuspendableWorkQueue(this)) { }
void SessionWork::workAdded(Ice::Long, bool wasEmpty)
@@ -1361,11 +1351,6 @@ void SessionWork::execute()
while(mInternalQueue->executeWork());
}
-SipSessionPtr SessionWork::getSession()
-{
- return mSession;
-}
-
void SessionWork::enqueueWork(const SuspendableWorkPtr& work)
{
mInternalQueue->enqueueWork(work);
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index 6aae6fd..75706e3 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -26,7 +26,6 @@
#include <AsteriskSCF/SmartProxy.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
-#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
#include "SipStateReplicator.h"
#include "SipSession.h"
@@ -38,43 +37,6 @@ namespace AsteriskSCF
namespace SipSessionManager
{
-/**
- * This is where session related work items get enqueued.
- *
- * Each session will have its own SessionWork object to which it will queue suspendable work.
- */
-class SessionWork : public AsteriskSCF::System::WorkQueue::V1::Work, public AsteriskSCF::System::WorkQueue::V1::QueueListener
-{
-public:
- /**
- * Overrides of QueueListener interface
- */
- SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue,
- const SipSessionPtr& session);
- void workAdded(Ice::Long newWork, bool wasEmpty);
- void workResumable();
- void emptied();
-
- /**
- * Override of Work interface
- */
- void execute();
-
- /**
- * This is the method that will be called within the SIP session gateway that will
- * result in work making it into the session gateway's thread pool.
- */
- void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr& work);
-
- SipSessionPtr getSession();
-private:
- AsteriskSCF::System::WorkQueue::V1::QueuePtr mThreadPoolQueue;
- SipSessionPtr mSession;
- AsteriskSCF::System::WorkQueue::V1::SuspendableQueuePtr mInternalQueue;
-};
-
-typedef IceUtil::Handle<SessionWork> SessionWorkPtr;
-
class PJSipSessionModInfo
{
public:
@@ -83,8 +45,6 @@ public:
void updateSessionState(pjsip_inv_session *inv_session);
SipSessionPtr getSessionPtr();
void setSessionPtr(const SipSessionPtr& sessionPtr);
- SessionWorkPtr getSessionWork();
- void setSessionWork(const SessionWorkPtr& sessionWork);
SipSessionStateItemPtr mSessionState;
SipInviteSessionStateItemPtr mInviteState;
bool mNeedsReplication;
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 86bc3c9..3c69bc4 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -164,6 +164,8 @@ public:
AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
+ SessionWorkPtr mSessionWork;
+
/**
* Shared mutex lock which protects some of the session.
*/
@@ -475,10 +477,10 @@ void SipSession::start(const Ice::Current&)
// Record our session within the dialog so code handling pjsip events can do STUFF
SipSessionPtr session = new SipSession(*this);
- SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue(), session));
+ SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
+ session->setSessionWork(sessionWork);
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inviteSession, session);
- session_mod_info->setSessionWork(sessionWork);
inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
// Create the actual INVITE packet
@@ -539,15 +541,7 @@ public:
*/
void SipSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
{
- PJSipSessionModInfo *session_mod_info =
- static_cast<PJSipSessionModInfo*>(mImplPriv->mInviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id]);
-
- SessionWorkPtr sessionWork = session_mod_info->getSessionWork();
-
- if (!sessionWork)
- {
- //XXX Um...
- }
+ SessionWorkPtr sessionWork = getSessionWork();
sessionWork->enqueueWork(new StopOperation(response, mImplPriv->mInviteSession));
}
@@ -556,10 +550,10 @@ class DestroyOperation : public SuspendableWork
{
public:
- DestroyOperation(const boost::shared_ptr<SipSessionPriv>& sessionPriv)
- : mSessionPriv(sessionPriv) { }
+ DestroyOperation(const SipSessionPtr& session, const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+ : mSession(session), mSessionPriv(sessionPriv) { }
- SuspendableWorkResult execute(const SuspendableWorkListener&)
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
// Remove all of the different interfaces we have exposed to the world.
mSessionPriv->mAdapter->remove(mSessionPriv->mSessionProxy->ice_getIdentity());
@@ -568,6 +562,7 @@ public:
if (mSessionPriv->mReplica->isActive() == true)
{
+ //XXX This loop may be a candidate for making AMI-ified and returning "Suspended"
// Release all the RTP sessions we are using
for (std::vector<AsteriskSCF::Media::RTP::V1::RTPSessionPrx>::const_iterator i =
mSessionPriv->mRTPSessions.begin(); i != mSessionPriv->mRTPSessions.end(); ++i)
@@ -595,29 +590,9 @@ public:
*/
void SipSession::destroy()
{
- // Remove all of the different interfaces we have exposed to the world.
- mImplPriv->mAdapter->remove(mImplPriv->mSessionProxy->ice_getIdentity());
- mImplPriv->mAdapter->remove(mImplPriv->mMediaSessionProxy->ice_getIdentity());
- mImplPriv->mMediaSession = 0;
-
- if (mImplPriv->mReplica->isActive() == true)
- {
- // Release all the RTP sessions we are using
- for (std::vector<AsteriskSCF::Media::RTP::V1::RTPSessionPrx>::const_iterator i =
- mImplPriv->mRTPSessions.begin(); i != mImplPriv->mRTPSessions.end(); ++i)
- {
- try
- {
- (*i)->release();
- }
- catch (const Ice::Exception& ex)
- {
- lg(Error) << "Exception caught while trying to release a media session\n" << ex.what();
- }
- }
- }
+ SessionWorkPtr sessionWork = getSessionWork();
- mImplPriv->mEndpoint->removeSession(this);
+ sessionWork->enqueueWork(new DestroyOperation(this, mImplPriv));
}
/**
@@ -832,5 +807,14 @@ bool SipSession::operator==(const SipSession &other) const {
return (this->mImplPriv->mInviteSession == other.mImplPriv->mInviteSession);
}
+SessionWorkPtr SipSession::getSessionWork()
+{
+ return mImplPriv->mSessionWork;
+}
+
+void SipSession::setSessionWork(const SessionWorkPtr& sessionWork)
+{
+ mImplPriv->mSessionWork = sessionWork;
+}
}; // end SipSessionManager
}; // end AsteriskSCF
diff --git a/src/SipSession.h b/src/SipSession.h
index 3596260..ab46d76 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -26,6 +26,7 @@
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
#include <SipStateReplicationIf.h>
@@ -53,6 +54,39 @@ class SipSessionPriv;
//I can haz forward declaration.
class PJSipManager;
+/**
+ * This is where session related work items get enqueued.
+ *
+ * Each session will have its own SessionWork object to which it will queue suspendable work.
+ */
+class SessionWork : public AsteriskSCF::System::WorkQueue::V1::Work, public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+ /**
+ * Overrides of QueueListener interface
+ */
+ SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
+ void workAdded(Ice::Long newWork, bool wasEmpty);
+ void workResumable();
+ void emptied();
+
+ /**
+ * Override of Work interface
+ */
+ void execute();
+
+ /**
+ * This is the method that will be called within the SIP session gateway that will
+ * result in work making it into the session gateway's thread pool.
+ */
+ void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr& work);
+private:
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mThreadPoolQueue;
+ AsteriskSCF::System::WorkQueue::V1::SuspendableQueuePtr mInternalQueue;
+};
+
+typedef IceUtil::Handle<SessionWork> SessionWorkPtr;
+
/*
* Implementation of the Session interface as defined in SessionCommunicationsIf.ice
*/
@@ -132,6 +166,10 @@ public:
void setListeners(const AsteriskSCF::SIP::V1::SessionListenerSeq&);
AsteriskSCF::Media::V1::SessionPrx getHiddenMediaSession();
+
+ SessionWorkPtr getSessionWork();
+
+ void setSessionWork(const SessionWorkPtr& sessionWork);
private:
void requestRTPSessions(AsteriskSCF::Media::V1::FormatSeq& formats);
commit 8dd3dc0e1a319ae29aa0cfd7eb67955d26d79241
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri May 6 15:53:00 2011 -0500
Add a DestroyOperation class.
I'm not actually using it at the moment because I'm not sure if the session_mod_info
is always going to be valid when we attempt to destroy the class. I have a plan
for reorganization so I'm going to fix that next.
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 5bbf949..86bc3c9 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -552,6 +552,44 @@ void SipSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCode
sessionWork->enqueueWork(new StopOperation(response, mImplPriv->mInviteSession));
}
+class DestroyOperation : public SuspendableWork
+{
+public:
+
+ DestroyOperation(const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+ : mSessionPriv(sessionPriv) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListener&)
+ {
+ // Remove all of the different interfaces we have exposed to the world.
+ mSessionPriv->mAdapter->remove(mSessionPriv->mSessionProxy->ice_getIdentity());
+ mSessionPriv->mAdapter->remove(mSessionPriv->mMediaSessionProxy->ice_getIdentity());
+ mSessionPriv->mMediaSession = 0;
+
+ if (mSessionPriv->mReplica->isActive() == true)
+ {
+ // Release all the RTP sessions we are using
+ for (std::vector<AsteriskSCF::Media::RTP::V1::RTPSessionPrx>::const_iterator i =
+ mSessionPriv->mRTPSessions.begin(); i != mSessionPriv->mRTPSessions.end(); ++i)
+ {
+ try
+ {
+ (*i)->release();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ lg(Error) << "Exception caught while trying to release a media session\n" << ex.what();
+ }
+ }
+ }
+ mSessionPriv->mEndpoint->removeSession(mSession);
+ return Complete;
+ }
+
+ SipSessionPtr mSession;
+ boost::shared_ptr<SipSessionPriv> mSessionPriv;
+};
+
/**
* Internal function called to destroy an endpoint. This is controlled by signaling.
*/
commit b2c9c8642fbde0022dd790f0a7fc4ccaa58071b1
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri May 6 13:13:28 2011 -0500
Fix up compilation problems.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 3e45f10..5ee5902 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -1304,14 +1304,14 @@ QueuePtr PJSipSessionModule::getThreadPoolQueue()
PJSipSessionModuleThreadPoolListener::PJSipSessionModuleThreadPoolListener()
: mActiveThreads(0) { }
-void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, int active, int idle, int)
+void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long)
{
//XXX Making this behavior more customizable would be nice
//
//For now, what we do is kill all idle threads.
if (idle > 0)
{
- pool->setSize(active);
+ pool->setSize((int) active);
}
mActiveThreads = active;
}
@@ -1321,7 +1321,7 @@ void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, I
//XXX Making this behavior more customizable would be nice
//
//For now, use one thread per work item.
- int newSize = mActiveThreads + numNewWork;
+ int newSize = (int) (mActiveThreads + numNewWork);
pool->setSize(newSize);
}
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index fed1c94..6aae6fd 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -106,11 +106,11 @@ class PJSipSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadP
{
public:
PJSipSessionModuleThreadPoolListener();
- void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int active, int idle, int zombie);
+ void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long zombie);
void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, bool wasEmpty);
void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
private:
- int mActiveThreads;
+ Ice::Long mActiveThreads;
};
typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index fddc0ab..5bbf949 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -40,6 +40,8 @@ namespace AsteriskSCF
namespace SipSessionManager
{
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
class SipMediaSession : public Media::V1::Session
{
public:
@@ -501,9 +503,9 @@ class StopOperation : public SuspendableWork
public:
StopOperation(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr response,
pjsip_inv_session *inv)
- : mResponse(response) mInviteSession(inv) { }
+ : mResponse(response), mInviteSession(inv) { }
- void execute()
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
pjsip_tx_data *packet;
// Assume a 503 until proven otherwise
@@ -523,6 +525,8 @@ public:
{
pjsip_inv_send_msg(mInviteSession, packet);
}
+
+ return Complete;
}
AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
@@ -535,14 +539,17 @@ public:
*/
void SipSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
{
- SessionWorkPtr sessionWork = mImplPriv->mManager->getSessionModule()->getSessionWork();
+ PJSipSessionModInfo *session_mod_info =
+ static_cast<PJSipSessionModInfo*>(mImplPriv->mInviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id]);
+
+ SessionWorkPtr sessionWork = session_mod_info->getSessionWork();
if (!sessionWork)
{
//XXX Um...
}
- sessionWork->enqueueWork(new StopOperation(response));
+ sessionWork->enqueueWork(new StopOperation(response, mImplPriv->mInviteSession));
}
/**
commit 76872d653fe255eb38335206d55eee31ad94f0c6
Merge: ee47a2e f268b4a
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri May 6 12:35:15 2011 -0500
Merge branch 'master' into threading
commit ee47a2ef8ac50751307755c71010d5c4b91effcc
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri May 6 12:34:30 2011 -0500
Adjust for changes made in WorkQueue and ThreadPool slice and add a queueable operation.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 507cdc9..46ed79a 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -1273,7 +1273,7 @@ void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, int
mActiveThreads = active;
}
-void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, int numNewWork, bool)
+void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, bool)
{
//XXX Making this behavior more customizable would be nice
//
@@ -1294,7 +1294,7 @@ SessionWork::SessionWork(const QueuePtr& queue, const SipSessionPtr& session)
: mThreadPoolQueue(queue), mSession(session),
mInternalQueue(new SuspendableWorkQueue(this)) { }
-void SessionWork::workAdded(int, bool wasEmpty)
+void SessionWork::workAdded(Ice::Long, bool wasEmpty)
{
if (wasEmpty)
{
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index f93d153..fed1c94 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -51,7 +51,7 @@ public:
*/
SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue,
const SipSessionPtr& session);
- void workAdded(int newWork, bool wasEmpty);
+ void workAdded(Ice::Long newWork, bool wasEmpty);
void workResumable();
void emptied();
@@ -107,7 +107,7 @@ class PJSipSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadP
public:
PJSipSessionModuleThreadPoolListener();
void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int active, int idle, int zombie);
- void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int count, bool wasEmpty);
+ void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, bool wasEmpty);
void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
private:
int mActiveThreads;
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 46cfa11..fddc0ab 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -20,6 +20,7 @@
#include <boost/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
#include <AsteriskSCF/logger.h>
#include "PJSipManager.h"
@@ -495,30 +496,53 @@ void SipSession::start(const Ice::Current&)
pjsip_inv_send_msg(inviteSession, packet);
}
+class StopOperation : public SuspendableWork
+{
+public:
+ StopOperation(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr response,
+ pjsip_inv_session *inv)
+ : mResponse(response) mInviteSession(inv) { }
+
+ void execute()
+ {
+ pjsip_tx_data *packet;
+ // Assume a 503 until proven otherwise
+ unsigned int code = 503;
+
+ // TODO: Convert ALL response codes to equivalent SIP ones, and allow configuration to change it
+ if (mResponse->isdnCode == 17)
+ {
+ code = 486;
+ }
+
+ // We have to check the existence of packet due to an edge case that we can trigger here.
+ // On an outbound call, if we have not received a provisional response yet, then PJSIP will
+ // set packet NULL but still return PJ_SUCCESS. In this case, if we attempt to call pjsip_inv_send_msg,
+ // then we will trigger an assertion since the packet we pass in is NULL.
+ if (mInviteSession && (pjsip_inv_end_session(mInviteSession, code, NULL, &packet) == PJ_SUCCESS) && packet)
+ {
+ pjsip_inv_send_msg(mInviteSession, packet);
+ }
+ }
+
+ AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
+ pjsip_inv_session *mInviteSession;
+};
+
/**
* An implementation of the stop method as defined in SessionCommunications.ice which sends
* a BYE or applicable response code to the SIP endpoint depending upon the state of the dialog.
*/
void SipSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
{
- pjsip_tx_data *packet;
- // Assume a 503 until proven otherwise
- unsigned int code = 503;
+ SessionWorkPtr sessionWork = mImplPriv->mManager->getSessionModule()->getSessionWork();
- // TODO: Convert ALL response codes to equivalent SIP ones, and allow configuration to change it
- if (response->isdnCode == 17)
+ if (!sessionWork)
{
- code = 486;
+ //XXX Um...
}
- // We have to check the existence of packet due to an edge case that we can trigger here.
- // On an outbound call, if we have not received a provisional response yet, then PJSIP will
- // set packet NULL but still return PJ_SUCCESS. In this case, if we attempt to call pjsip_inv_send_msg,
- // then we will trigger an assertion since the packet we pass in is NULL.
- if (mImplPriv->mInviteSession && (pjsip_inv_end_session(mImplPriv->mInviteSession, code, NULL, &packet) == PJ_SUCCESS) && packet)
- {
- pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
- }
+ sessionWork->enqueueWork(new StopOperation(response));
}
/**
commit b917d36d4d91e0069678882fa6785a6ff78ce677
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu May 5 16:37:29 2011 -0500
Add the SessionWork to the session_mod_info, and set it where session_mod_info is created.
This will allow us to pull the SessionWork easily.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index b257082..507cdc9 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -249,6 +249,16 @@ void PJSipSessionModInfo::setSessionPtr(const SipSessionPtr& sessionPtr)
mSession = sessionPtr;
}
+SessionWorkPtr PJSipSessionModInfo::getSessionWork()
+{
+ return mSessionWork;
+}
+
+void PJSipSessionModInfo::setSessionWork(const SessionWorkPtr& sessionWork)
+{
+ mSessionWork = sessionWork;
+}
+
InviteSessionState PJSipSessionModInfo::inviteStateTranslate(pjsip_inv_state state)
{
InviteSessionState retState;
@@ -587,6 +597,7 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
session->setInviteSession(inv_session);
session->setDialog(dlg);
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inv_session, session);
+ session_mod_info->setSessionWork(sessionWork);
dlg_mod_info->mDialogState->mSessionId = session_mod_info->mSessionState->mSessionId;
tsx_mod_info->mTransactionState->mSessionId = session_mod_info->mSessionState->mSessionId;
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index aea0594..f93d153 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -38,43 +38,6 @@ namespace AsteriskSCF
namespace SipSessionManager
{
-class PJSipSessionModInfo
-{
-public:
- PJSipSessionModInfo(pjsip_inv_session *inv_session, const SipSessionPtr&);
- ~PJSipSessionModInfo();
- void updateSessionState(pjsip_inv_session *inv_session);
- SipSessionPtr getSessionPtr();
- void setSessionPtr(const SipSessionPtr& sessionPtr);
- SipSessionStateItemPtr mSessionState;
- SipInviteSessionStateItemPtr mInviteState;
- bool mNeedsReplication;
- bool mNeedsRemoval;
- boost::shared_mutex mLock;
-private:
- InviteSessionState inviteStateTranslate(pjsip_inv_state state);
- SipSessionPtr mSession;
-};
-
-/**
- * Listens to the PJSipSessionModule's thread pool.
- *
- * It is responsible for all decisions regarding the pool. This mostly means
- * that it decides when the pool should have greater or fewer threads.
- */
-class PJSipSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
-{
-public:
- PJSipSessionModuleThreadPoolListener();
- void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int active, int idle, int zombie);
- void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int count, bool wasEmpty);
- void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
-private:
- int mActiveThreads;
-};
-
-typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
-
/**
* This is where session related work items get enqueued.
*
@@ -112,6 +75,46 @@ private:
typedef IceUtil::Handle<SessionWork> SessionWorkPtr;
+class PJSipSessionModInfo
+{
+public:
+ PJSipSessionModInfo(pjsip_inv_session *inv_session, const SipSessionPtr&);
+ ~PJSipSessionModInfo();
+ void updateSessionState(pjsip_inv_session *inv_session);
+ SipSessionPtr getSessionPtr();
+ void setSessionPtr(const SipSessionPtr& sessionPtr);
+ SessionWorkPtr getSessionWork();
+ void setSessionWork(const SessionWorkPtr& sessionWork);
+ SipSessionStateItemPtr mSessionState;
+ SipInviteSessionStateItemPtr mInviteState;
+ bool mNeedsReplication;
+ bool mNeedsRemoval;
+ boost::shared_mutex mLock;
+private:
+ InviteSessionState inviteStateTranslate(pjsip_inv_state state);
+ SipSessionPtr mSession;
+ SessionWorkPtr mSessionWork;
+};
+
+/**
+ * Listens to the PJSipSessionModule's thread pool.
+ *
+ * It is responsible for all decisions regarding the pool. This mostly means
+ * that it decides when the pool should have greater or fewer threads.
+ */
+class PJSipSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
+{
+public:
+ PJSipSessionModuleThreadPoolListener();
+ void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int active, int idle, int zombie);
+ void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int count, bool wasEmpty);
+ void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
+private:
+ int mActiveThreads;
+};
+
+typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
+
class PJSipSessionModule : public PJSipModule
{
public:
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 5019d57..46cfa11 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -475,6 +475,7 @@ void SipSession::start(const Ice::Current&)
SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue(), session));
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inviteSession, session);
+ session_mod_info->setSessionWork(sessionWork);
inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
// Create the actual INVITE packet
commit d395576d5663b58be0b3ac74609e735b5fa858d5
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu May 5 16:14:44 2011 -0500
Add some definitions for the pool listener and allocate SessionWork where necessary.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index d506329..b257082 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -581,6 +581,9 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
pjsip_inv_send_msg(inv_session, tdata);
return;
}
+
+ SessionWorkPtr sessionWork(new SessionWork(mPoolQueue, session));
+
session->setInviteSession(inv_session);
session->setDialog(dlg);
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inv_session, session);
@@ -1239,23 +1242,41 @@ pjsip_dialog *PJSipSessionModule::uaOnDialogForked(pjsip_dialog*, pjsip_rx_data*
return NULL;
}
-PJSipSessionModuleThreadPoolListener::PJSipSessionModuleThreadPoolListener()
+QueuePtr PJSipSessionModule::getThreadPoolQueue()
{
+ return mPoolQueue;
}
-void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr&, int, int, int)
+PJSipSessionModuleThreadPoolListener::PJSipSessionModuleThreadPoolListener()
+ : mActiveThreads(0) { }
+
+void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, int active, int idle, int)
{
- //stub
+ //XXX Making this behavior more customizable would be nice
+ //
+ //For now, what we do is kill all idle threads.
+ if (idle > 0)
+ {
+ pool->setSize(active);
+ }
+ mActiveThreads = active;
}
-void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr&, int, bool)
+void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, int numNewWork, bool)
{
- //stub
+ //XXX Making this behavior more customizable would be nice
+ //
+ //For now, use one thread per work item.
+ int newSize = mActiveThreads + numNewWork;
+ pool->setSize(newSize);
}
-void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr&)
+void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)
{
- //stub
+ //XXX Making this behavior more customizable would be nice
+ //
+ //For now, kill off everything
+ pool->setSize(0);
}
SessionWork::SessionWork(const QueuePtr& queue, const SipSessionPtr& session)
@@ -1286,6 +1307,11 @@ void SessionWork::execute()
while(mInternalQueue->executeWork());
}
+SipSessionPtr SessionWork::getSession()
+{
+ return mSession;
+}
+
void SessionWork::enqueueWork(const SuspendableWorkPtr& work)
{
mInternalQueue->enqueueWork(work);
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index 2b4ec41..aea0594 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -56,8 +56,6 @@ private:
SipSessionPtr mSession;
};
-class PJSipSessionModuleThreadPoolListenerPriv;
-
/**
* Listens to the PJSipSessionModule's thread pool.
*
@@ -72,7 +70,7 @@ public:
void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int count, bool wasEmpty);
void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
private:
- boost::shared_ptr<PJSipSessionModuleThreadPoolListenerPriv> mPriv;
+ int mActiveThreads;
};
typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
@@ -143,6 +141,8 @@ public:
// Missing onsendack for now
void replicateState(PJSipDialogModInfo *dlgInfo, PJSipTransactionModInfo *tsxInfo,
PJSipSessionModInfo *sessionInfo);
+
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr getThreadPoolQueue();
private:
void handleNewInvite(pjsip_rx_data *rdata);
void handleInviteResponse(pjsip_inv_session *inv, pjsip_rx_data *rdata, pjsip_dialog *dlg);
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 0eafdfc..5019d57 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -471,6 +471,9 @@ void SipSession::start(const Ice::Current&)
// Record our session within the dialog so code handling pjsip events can do STUFF
SipSessionPtr session = new SipSession(*this);
+
+ SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue(), session));
+
PJSipSessionModInfo *session_mod_info = new PJSipSessionModInfo(inviteSession, session);
inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
commit 13f04bb0a3bc4098b7cc9b1a5f394e2876376292
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu May 5 13:55:08 2011 -0500
Add the initial SessionWork definitions.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index d2c8b87..d506329 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -20,6 +20,8 @@
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/SuspendableWorkQueue.h>
#include "PJSipSessionModule.h"
#include "SipEndpoint.h"
@@ -53,6 +55,7 @@ using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::SIP::V1;
using namespace AsteriskSCF::System::ThreadPool::V1;
using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
class RouteSessionCallback : public IceUtil::Shared
{
@@ -1255,29 +1258,37 @@ void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr&)
//stub
}
-SessionWork::SessionWork(const QueuePtr&)
+SessionWork::SessionWork(const QueuePtr& queue, const SipSessionPtr& session)
+ : mThreadPoolQueue(queue), mSession(session),
+ mInternalQueue(new SuspendableWorkQueue(this)) { }
+
+void SessionWork::workAdded(int, bool wasEmpty)
{
- //stub
+ if (wasEmpty)
+ {
+ mThreadPoolQueue->enqueueWork(this);
+ }
}
-void SessionWork::workAdded(int, bool)
+void SessionWork::workResumable()
{
- //stub
+ mThreadPoolQueue->enqueueWork(this);
}
void SessionWork::emptied()
{
- //stub
+ //Empty on purpose. Nothing special
+ //to do here.
}
void SessionWork::execute()
{
- //stub
+ while(mInternalQueue->executeWork());
}
-void SessionWork::enqueueWork(const SuspendableWorkPtr&)
+void SessionWork::enqueueWork(const SuspendableWorkPtr& work)
{
- //stub
+ mInternalQueue->enqueueWork(work);
}
}; //end namespace SipSessionManager
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index e680471..2b4ec41 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -77,8 +77,6 @@ private:
typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
-class SessionWorkPriv;
-
/**
* This is where session related work items get enqueued.
*
@@ -90,8 +88,10 @@ public:
/**
* Overrides of QueueListener interface
*/
- SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
+ SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue,
+ const SipSessionPtr& session);
void workAdded(int newWork, bool wasEmpty);
+ void workResumable();
void emptied();
/**
@@ -104,8 +104,12 @@ public:
* result in work making it into the session gateway's thread pool.
*/
void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr& work);
+
+ SipSessionPtr getSession();
private:
- boost::shared_ptr<SessionWorkPriv> mPriv;
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mThreadPoolQueue;
+ SipSessionPtr mSession;
+ AsteriskSCF::System::WorkQueue::V1::SuspendableQueuePtr mInternalQueue;
};
typedef IceUtil::Handle<SessionWork> SessionWorkPtr;
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list