[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "thread_safe_registrar" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Jul 16 18:56:09 CDT 2012
branch "thread_safe_registrar" has been created
at b83034471ecf88641b803585e7a9c9d0d19aa27e (commit)
- Log -----------------------------------------------------------------
commit b83034471ecf88641b803585e7a9c9d0d19aa27e
Author: Ken Hunt <ken.hunt at digium.com>
Date: Mon Jul 16 18:52:56 2012 -0500
HYDRA-827 - Changes to make registrar thread safe.
diff --git a/src/PJSIPRegistrarModule.cpp b/src/PJSIPRegistrarModule.cpp
index 1821f41..3607167 100644
--- a/src/PJSIPRegistrarModule.cpp
+++ b/src/PJSIPRegistrarModule.cpp
@@ -678,35 +678,43 @@ bool verifyContacts(const std::vector<pjsip_contact_hdr *>& contacts)
return true;
}
-BindingWrapperSeq::iterator PJSIPRegistrarModule::findMatchingBinding(pjsip_contact_hdr *contact, BindingWrapperSeq& bindings)
+BindingWrapperSeq::iterator PJSIPRegistrarModule::findMatchingBinding(const std::string& contactURI, BindingWrapperSeq& bindings)
{
- char buf[512];
- pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
- pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
- std::string contactURIStr(buf, strlen(buf));
-
- lg(Debug) << "Searching for binding " << contactURIStr;
+ lg(Debug) << "Searching for binding " << contactURI;
for (BindingWrapperSeq::iterator iter = bindings.begin(); iter != bindings.end(); ++iter)
{
- lg(Debug) << "Comparing REGISTER contact " << contactURIStr << " with binding contact " << (*iter)->mBinding->contact;
- if (contactURIStr == (*iter)->mBinding->contact || contactURIStr == "*")
+ lg(Debug) << "Comparing REGISTER contact " << contactURI << " with binding contact " << (*iter)->mBinding->contact;
+ if (contactURI == (*iter)->mBinding->contact || contactURI == "*")
{
lg(Debug) << "Found matching binding " << (*iter)->mBinding->contact;
return iter;
}
}
- lg(Debug) << "No matching binding found for " << contactURIStr;
+ lg(Debug) << "No matching binding found for " << contactURI;
return bindings.end();
}
-int PJSIPRegistrarModule::getExpiration(pjsip_contact_hdr *contact, pjsip_rx_data *rdata)
+void PJSIPRegistrarModule::addURIExpiration(ContactExpireMap& expireMap, pjsip_contact_hdr *contact, int defaultExpire)
{
+ int expires = defaultExpire;
+
if (contact->expires != -1)
{
lg(Debug) << "Contact header has expires parameter of " << contact->expires;
- return contact->expires;
+ expires = contact->expires;
}
+ char buf[512];
+ pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
+ pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
+ std::string contactURIStr(buf, strlen(buf));
+
+ expireMap[contactURIStr] = expires;
+}
+
+int PJSIPRegistrarModule::getMessageExpiration(pjsip_rx_data *rdata)
+{
+
pjsip_expires_hdr *expires = (pjsip_expires_hdr *) pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
if (expires)
@@ -725,49 +733,21 @@ int PJSIPRegistrarModule::getExpiration(pjsip_contact_hdr *contact, pjsip_rx_dat
}
}
-BindingWrapperPtr PJSIPRegistrarModule::createNewBinding(pjsip_contact_hdr *contact, const std::string &callID, int cSeq, int expiration, const std::string& aor)
+BindingWrapperPtr PJSIPRegistrarModule::createNewBinding(const std::string& contactURI,
+ const std::string &callID, int cSeq, int expiration, const std::string& aor)
{
- char buf[512];
- pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
- pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
- std::string contactURIStr(buf, strlen(buf));
-
- lg(Debug) << "Creating new binding with Contact: " << contactURIStr
+ lg(Debug) << "Creating new binding with Contact: " << contactURI
<< ", Call-Id: " << callID
<< ", CSeq: " << cSeq
<< ", and Expires: " << expiration;
- BindingPtr binding(new Binding(contactURIStr, callID, cSeq, (int)time(NULL) + expiration));
+ BindingPtr binding(new Binding(contactURI, callID, cSeq, (int)time(NULL) + expiration));
BindingWrapperPtr wrapper(new BindingWrapper(expiration, binding, this, mEndpoint, aor));
//We could just return binding, but using wrapper here makes the compiler
//not complain about an unused variable.
return wrapper;
}
-class Response : public Work
-{
-public:
- Response(pjsip_transaction *tsx, pjsip_tx_data *tdata, const std::string& aor, const RegistrarIPtr& registrar)
- : mTsx(tsx), mTdata(tdata), mAOR(aor), mRegistrar(registrar) { }
- void execute()
- {
- BindingWrapperSeq wrappers = mRegistrar->getAORBindingWrappers(mAOR);
- for (BindingWrapperSeq::const_iterator iter = wrappers.begin(); iter != wrappers.end(); ++iter)
- {
- pjsip_contact_hdr *contact = pjsip_contact_hdr_create(mTdata->pool);
- contact->uri = pjsip_parse_uri(mTdata->pool, (char*) (*iter)->mBinding->contact.c_str(), (*iter)->mBinding->contact.size(), PJSIP_URI_IN_FROMTO_HDR);
- contact->expires = (*iter)->mBinding->expiration - time(0);
- pjsip_msg_add_hdr(mTdata->msg, (pjsip_hdr*) contact);
- }
- pjsip_tsx_send_msg(mTsx, mTdata);
- }
-private:
- pjsip_transaction *mTsx;
- pjsip_tx_data *mTdata;
- const std::string mAOR;
- RegistrarIPtr mRegistrar;
-};
-
bool PJSIPRegistrarModule::checkAuth(pjsip_rx_data *rdata, pjsip_transaction *tsx, RequestType type)
{
DigestChallengeSeq digests;
@@ -804,6 +784,96 @@ void PJSIPRegistrarModule::authTimeout(pj_timer_heap_t *timer_heap, pj_timer_ent
mAuthManager->authTimeout(timer_heap, entry);
}
+class RegisterRequest : public Work
+{
+public:
+ RegisterRequest(
+ const PJSIPRegistrarModulePtr& module,
+ const RegistrarIPtr& registrar,
+ const std::string& aor,
+ const ContactExpireMap& contactExpireMap,
+ const std::string& callId,
+ int cSeq,
+ pjsip_transaction* tsx,
+ pjsip_tx_data *tdata)
+ : mModule(module),
+ mRegistrar(registrar),
+ mAOR(aor),
+ mContactExpireMap(contactExpireMap),
+ mCallId(callId),
+ mCSeq(cSeq),
+ mTsx(tsx),
+ mTdata(tdata) {}
+
+ void execute()
+ {
+ // We should attempt to determine at this point who the
+ // REGISTER is from and determine whether they have permission
+ // to be doing this sort of thing. How this is done is still murky
+ // and should perhaps be put in writing on the wiki or in some other
+ // way decided upon before trying to write any code for it.
+
+ lg(Debug) << "AoR in REGISTER is " << mAOR;
+ BindingWrapperSeq& currentBindings = mRegistrar->getAORBindingWrappers(mAOR);
+
+ BindingWrapperSeq newBindings;
+ BindingWrapperSeq removedBindings;
+ BindingWrapperSeq existingBindings;
+ for (ContactExpireMap::iterator iter = mContactExpireMap.begin();
+ iter != mContactExpireMap.end(); ++iter)
+ {
+ BindingWrapperSeq::iterator bindingToUpdate = mModule->findMatchingBinding((*iter).first, currentBindings);
+
+ int expiration = (*iter).second;
+
+ if (bindingToUpdate != currentBindings.end())
+ {
+ if (expiration == 0)
+ {
+ lg(Debug) << "Adding " << (*bindingToUpdate)->mBinding->contact << " to our bindings to remove";
+ removedBindings.push_back(*bindingToUpdate);
+ }
+ else
+ {
+ (*bindingToUpdate)->updateBinding(mCallId, mCSeq, expiration, mRegistrar->getQueue());
+ lg(Debug) << "Maintaining " << (*bindingToUpdate)->mBinding->contact << " in our existing bindings";
+ existingBindings.push_back(*bindingToUpdate);
+ }
+ }
+ else
+ {
+ BindingWrapperPtr wrapper = mModule->createNewBinding((*iter).first, mCallId, mCSeq, expiration, mAOR);
+ lg(Debug) << "Adding " << wrapper->mBinding->contact << " to our bindings to add";
+ newBindings.push_back(wrapper);
+ }
+ }
+
+ mRegistrar->addAndRemoveBindings(mAOR, newBindings, removedBindings, mRegistrar->getQueue());
+
+ BindingWrapperSeq wrappers = mRegistrar->getAORBindingWrappers(mAOR);
+ for (BindingWrapperSeq::const_iterator iter = wrappers.begin(); iter != wrappers.end(); ++iter)
+ {
+ pjsip_contact_hdr *contact = pjsip_contact_hdr_create(mTdata->pool);
+ contact->uri = pjsip_parse_uri(mTdata->pool, (char*) (*iter)->mBinding->contact.c_str(), (*iter)->mBinding->contact.size(), PJSIP_URI_IN_FROMTO_HDR);
+ contact->expires = (*iter)->mBinding->expiration - time(0);
+ pjsip_msg_add_hdr(mTdata->msg, (pjsip_hdr*) contact);
+ }
+ pjsip_tsx_send_msg(mTsx, mTdata);
+
+ mModule->replicateState(mAOR, existingBindings, newBindings, removedBindings);
+ }
+
+private:
+ PJSIPRegistrarModulePtr mModule;
+ RegistrarIPtr mRegistrar;
+ const std::string mAOR;
+ ContactExpireMap mContactExpireMap;
+ std::string mCallId;
+ int mCSeq;
+ pjsip_transaction *mTsx;
+ pjsip_tx_data *mTdata;
+};
+
pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
{
if (rdata->msg_info.msg->line.req.method.id != PJSIP_REGISTER_METHOD)
@@ -827,17 +897,8 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
return PJ_TRUE;
}
- // We should attempt to determine at this point who the
- // REGISTER is from and determine whether they have permission
- // to be doing this sort of thing. How this is done is still murky
- // and should perhaps be put in writing on the wiki or in some other
- // way decided upon before trying to write any code for it.
-
std::string aor = getAOR(rdata);
- lg(Debug) << "AoR in REGISTER is " << aor;
- BindingWrapperSeq& currentBindings = mRegistrar->getAORBindingWrappers(aor);
std::vector<pjsip_contact_hdr *> registerContacts = extractRegisterContacts(rdata);
-
std::string callID(pj_strbuf(&rdata->msg_info.cid->id), pj_strlen(&rdata->msg_info.cid->id));
int cSeq = rdata->msg_info.cseq->cseq;
@@ -858,38 +919,6 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
return PJ_TRUE;
}
- BindingWrapperSeq newBindings;
- BindingWrapperSeq removedBindings;
- BindingWrapperSeq existingBindings;
- for (std::vector<pjsip_contact_hdr *>::iterator iter = registerContacts.begin();
- iter != registerContacts.end(); ++iter)
- {
- BindingWrapperSeq::iterator bindingToUpdate = findMatchingBinding(*iter, currentBindings);
-
- int expiration = getExpiration(*iter, rdata);
-
- if (bindingToUpdate != currentBindings.end())
- {
- if (expiration == 0)
- {
- lg(Debug) << "Adding " << (*bindingToUpdate)->mBinding->contact << " to our bindings to remove";
- removedBindings.push_back(*bindingToUpdate);
- }
- else
- {
- (*bindingToUpdate)->updateBinding(callID, cSeq, expiration, mRegistrar->getQueue());
- lg(Debug) << "Maintaining " << (*bindingToUpdate)->mBinding->contact << " in our existing bindings";
- existingBindings.push_back(*bindingToUpdate);
- }
- }
- else
- {
- BindingWrapperPtr wrapper = createNewBinding(*iter, callID, cSeq, expiration, aor);
- lg(Debug) << "Adding " << wrapper->mBinding->contact << " to our bindings to add";
- newBindings.push_back(wrapper);
- }
- }
-
// We enqueue the SIP response to make sure we send it and replicate state AFTER we have updated
// our internal bindings.
pjsip_tx_data *tdata;
@@ -899,9 +928,18 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
return PJ_TRUE;
}
- mRegistrar->addAndRemoveBindings(aor, newBindings, removedBindings, mRegistrar->getQueue());
- mRegistrar->getQueue()->enqueueWork(new Response(tsx, tdata, aor, mRegistrar));
- replicateState(aor, existingBindings, newBindings, removedBindings);
+ int defaultExpiration = getMessageExpiration(rdata);
+
+ ContactExpireMap contactExpireMap;
+ for (std::vector<pjsip_contact_hdr *>::iterator iter = registerContacts.begin();
+ iter != registerContacts.end(); ++iter)
+ {
+ addURIExpiration(contactExpireMap, (*iter), defaultExpiration);
+ }
+
+ // enqueue the work
+ getRegistrar()->getQueue()->enqueueWork(
+ new RegisterRequest(this, getRegistrar(), aor, contactExpireMap, callID, cSeq, tsx, tdata));
return PJ_TRUE;
}
diff --git a/src/PJSIPRegistrarModule.h b/src/PJSIPRegistrarModule.h
index 43dc6a6..7027da0 100644
--- a/src/PJSIPRegistrarModule.h
+++ b/src/PJSIPRegistrarModule.h
@@ -101,6 +101,8 @@ private:
typedef IceUtil::Handle<RegistrarI> RegistrarIPtr;
+typedef std::map<std::string, int> ContactExpireMap;
+
class PJSIPRegistrarModule : public PJSIPModule
{
public:
@@ -134,32 +136,35 @@ public:
const BindingWrapperSeq& newBindings,
const BindingWrapperSeq& removedBindings);
-private:
- /**
- * Extract the AoR from the To header of a REGISTER request
- */
- std::string getAOR(pjsip_rx_data *rdata);
- /**
- * Get the Contact URIs from a REGISTER request
- */
- std::vector<pjsip_contact_hdr *> extractRegisterContacts(pjsip_rx_data *rdata);
/**
* Try to find an existing BindingWrapper matching a Contact header from
* a REGISTER
*/
- BindingWrapperSeq::iterator findMatchingBinding(pjsip_contact_hdr *contact,
+ BindingWrapperSeq::iterator findMatchingBinding(const std::string& contactURI,
BindingWrapperSeq& bindings);
- /**
- * Find the expiration of a binding from a REGISTER request
- */
- int getExpiration(pjsip_contact_hdr *contact, pjsip_rx_data *rdata);
+
/**
* Create a new BindingWrapper based on information extracted from a
* REGISTER request
*/
- BindingWrapperPtr createNewBinding(pjsip_contact_hdr *contact,
+ BindingWrapperPtr createNewBinding(const std::string& contactURI,
const std::string& callID, int cSeq, int expiration, const std::string& aor);
+ /**
+ * Find the expiration of a binding from a REGISTER request
+ */
+ int getMessageExpiration(pjsip_rx_data *rdata);
+
+private:
+ /**
+ * Extract the AoR from the To header of a REGISTER request
+ */
+ std::string getAOR(pjsip_rx_data *rdata);
+ /**
+ * Get the Contact URIs from a REGISTER request
+ */
+ std::vector<pjsip_contact_hdr *> extractRegisterContacts(pjsip_rx_data *rdata);
+
bool checkAuth(pjsip_rx_data *rdata,
pjsip_transaction *tsx,
AsteriskSCF::SIP::ExtensionPoint::V1::RequestType type);
@@ -169,6 +174,8 @@ private:
pjsip_endpoint *mEndpoint;
RegistrarIPtr mRegistrar;
SIPReplicationContextPtr mReplicationContext;
+
+ void addURIExpiration(ContactExpireMap& contactExpireMap, pjsip_contact_hdr *contact, int defaultExpire);
};
typedef IceUtil::Handle<PJSIPRegistrarModule> PJSIPRegistrarModulePtr;
@@ -200,7 +207,10 @@ public:
* Update the binding in the wrapper with new information. This will
* result in rescheduling destruction of the binding.
*/
- void updateBinding(const std::string &callID, int cSeq, int expiration, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+ void updateBinding(const std::string &callID,
+ int cSeq,
+ int expiration,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
bool operator==(const BindingWrapper& rhs);
bool operator==(const AsteriskSCF::SIP::Registration::V1::BindingPtr& rhs);
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list