[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "thread_safe_registrar" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Jul 16 18:56:09 CDT 2012


branch "thread_safe_registrar" has been created
        at  b83034471ecf88641b803585e7a9c9d0d19aa27e (commit)

- Log -----------------------------------------------------------------
commit b83034471ecf88641b803585e7a9c9d0d19aa27e
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Mon Jul 16 18:52:56 2012 -0500

    HYDRA-827 - Changes to make registrar thread safe.

diff --git a/src/PJSIPRegistrarModule.cpp b/src/PJSIPRegistrarModule.cpp
index 1821f41..3607167 100644
--- a/src/PJSIPRegistrarModule.cpp
+++ b/src/PJSIPRegistrarModule.cpp
@@ -678,35 +678,43 @@ bool verifyContacts(const std::vector<pjsip_contact_hdr *>& contacts)
     return true;
 }
 
-BindingWrapperSeq::iterator PJSIPRegistrarModule::findMatchingBinding(pjsip_contact_hdr *contact, BindingWrapperSeq& bindings)
+BindingWrapperSeq::iterator PJSIPRegistrarModule::findMatchingBinding(const std::string& contactURI, BindingWrapperSeq& bindings)
 {
-    char buf[512];
-    pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
-    pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
-    std::string contactURIStr(buf, strlen(buf));
-
-    lg(Debug) << "Searching for binding " << contactURIStr;
+    lg(Debug) << "Searching for binding " << contactURI;
     for (BindingWrapperSeq::iterator iter = bindings.begin(); iter != bindings.end(); ++iter)
     {
-        lg(Debug) << "Comparing REGISTER contact " << contactURIStr << " with binding contact " << (*iter)->mBinding->contact;
-        if (contactURIStr == (*iter)->mBinding->contact || contactURIStr == "*")
+        lg(Debug) << "Comparing REGISTER contact " << contactURI << " with binding contact " << (*iter)->mBinding->contact;
+        if (contactURI == (*iter)->mBinding->contact || contactURI == "*")
         {
             lg(Debug) << "Found matching binding " << (*iter)->mBinding->contact;
             return iter;
         }
     }
-    lg(Debug) << "No matching binding found for " << contactURIStr;
+    lg(Debug) << "No matching binding found for " << contactURI;
     return bindings.end();
 }
 
-int PJSIPRegistrarModule::getExpiration(pjsip_contact_hdr *contact, pjsip_rx_data *rdata)
+void PJSIPRegistrarModule::addURIExpiration(ContactExpireMap& expireMap, pjsip_contact_hdr *contact, int defaultExpire)
 {
+    int expires = defaultExpire;
+
     if (contact->expires != -1)
     {
         lg(Debug) << "Contact header has expires parameter of " << contact->expires;
-        return contact->expires;
+        expires = contact->expires;
     }
 
+    char buf[512];
+    pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
+    pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
+    std::string contactURIStr(buf, strlen(buf));
+
+    expireMap[contactURIStr] = expires;
+}
+
+int PJSIPRegistrarModule::getMessageExpiration(pjsip_rx_data *rdata)
+{
+
     pjsip_expires_hdr *expires = (pjsip_expires_hdr *) pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
 
     if (expires)
@@ -725,49 +733,21 @@ int PJSIPRegistrarModule::getExpiration(pjsip_contact_hdr *contact, pjsip_rx_dat
     }
 }
 
-BindingWrapperPtr PJSIPRegistrarModule::createNewBinding(pjsip_contact_hdr *contact, const std::string &callID, int cSeq, int expiration, const std::string& aor)
+BindingWrapperPtr PJSIPRegistrarModule::createNewBinding(const std::string& contactURI, 
+      const std::string &callID, int cSeq, int expiration, const std::string& aor)
 {
-    char buf[512];
-    pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
-    pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
-    std::string contactURIStr(buf, strlen(buf));
-
-    lg(Debug) << "Creating new binding with Contact: " << contactURIStr
+    lg(Debug) << "Creating new binding with Contact: " << contactURI
         << ", Call-Id: " << callID
         << ", CSeq: " << cSeq
         << ", and Expires: " << expiration;
 
-    BindingPtr binding(new Binding(contactURIStr, callID, cSeq, (int)time(NULL) + expiration));
+    BindingPtr binding(new Binding(contactURI, callID, cSeq, (int)time(NULL) + expiration));
     BindingWrapperPtr wrapper(new BindingWrapper(expiration, binding, this, mEndpoint, aor));
     //We could just return binding, but using wrapper here makes the compiler
     //not complain about an unused variable.
     return wrapper;
 }
 
-class Response : public Work
-{
-public:
-    Response(pjsip_transaction *tsx, pjsip_tx_data *tdata, const std::string& aor, const RegistrarIPtr& registrar)
-        : mTsx(tsx), mTdata(tdata), mAOR(aor), mRegistrar(registrar) { }
-    void execute()
-    {
-        BindingWrapperSeq wrappers = mRegistrar->getAORBindingWrappers(mAOR);
-        for (BindingWrapperSeq::const_iterator iter = wrappers.begin(); iter != wrappers.end(); ++iter)
-        {
-            pjsip_contact_hdr *contact = pjsip_contact_hdr_create(mTdata->pool);
-            contact->uri = pjsip_parse_uri(mTdata->pool, (char*) (*iter)->mBinding->contact.c_str(), (*iter)->mBinding->contact.size(), PJSIP_URI_IN_FROMTO_HDR);
-            contact->expires = (*iter)->mBinding->expiration - time(0);
-            pjsip_msg_add_hdr(mTdata->msg, (pjsip_hdr*) contact);
-        }
-        pjsip_tsx_send_msg(mTsx, mTdata);
-    }
-private:
-    pjsip_transaction *mTsx;
-    pjsip_tx_data *mTdata;
-    const std::string mAOR;
-    RegistrarIPtr mRegistrar;
-};
-
 bool PJSIPRegistrarModule::checkAuth(pjsip_rx_data *rdata, pjsip_transaction *tsx, RequestType type)
 {
     DigestChallengeSeq digests;
@@ -804,6 +784,96 @@ void PJSIPRegistrarModule::authTimeout(pj_timer_heap_t *timer_heap, pj_timer_ent
     mAuthManager->authTimeout(timer_heap, entry);
 }
 
+class RegisterRequest : public Work
+{
+public:
+    RegisterRequest(
+            const PJSIPRegistrarModulePtr& module,
+            const RegistrarIPtr& registrar,
+            const std::string& aor,
+            const ContactExpireMap& contactExpireMap,
+            const std::string& callId,
+            int cSeq,
+            pjsip_transaction* tsx,
+            pjsip_tx_data *tdata)
+        : mModule(module),
+          mRegistrar(registrar), 
+          mAOR(aor), 
+          mContactExpireMap(contactExpireMap), 
+          mCallId(callId), 
+          mCSeq(cSeq),
+          mTsx(tsx),
+          mTdata(tdata) {}
+
+    void execute()
+    {
+        // We should attempt to determine at this point who the
+        // REGISTER is from and determine whether they have permission
+        // to be doing this sort of thing. How this is done is still murky
+        // and should perhaps be put in writing on the wiki or in some other
+        // way decided upon before trying to write any code for it.
+       
+        lg(Debug) << "AoR in REGISTER is " << mAOR;
+        BindingWrapperSeq& currentBindings = mRegistrar->getAORBindingWrappers(mAOR);
+
+        BindingWrapperSeq newBindings;
+        BindingWrapperSeq removedBindings;
+        BindingWrapperSeq existingBindings;
+        for (ContactExpireMap::iterator iter = mContactExpireMap.begin();
+                iter != mContactExpireMap.end(); ++iter)
+        {
+            BindingWrapperSeq::iterator bindingToUpdate = mModule->findMatchingBinding((*iter).first, currentBindings);
+
+            int expiration = (*iter).second;
+
+            if (bindingToUpdate != currentBindings.end())
+            {
+                if (expiration == 0)
+                {
+                    lg(Debug) << "Adding " << (*bindingToUpdate)->mBinding->contact << " to our bindings to remove";
+                    removedBindings.push_back(*bindingToUpdate);
+                }
+                else
+                {
+                    (*bindingToUpdate)->updateBinding(mCallId, mCSeq, expiration, mRegistrar->getQueue());
+                    lg(Debug) << "Maintaining " << (*bindingToUpdate)->mBinding->contact << " in our existing bindings";
+                    existingBindings.push_back(*bindingToUpdate);
+                }
+            }
+            else
+            {
+                BindingWrapperPtr wrapper = mModule->createNewBinding((*iter).first, mCallId, mCSeq, expiration, mAOR);
+                lg(Debug) << "Adding " << wrapper->mBinding->contact << " to our bindings to add";
+                newBindings.push_back(wrapper);
+            }
+        }
+
+        mRegistrar->addAndRemoveBindings(mAOR, newBindings, removedBindings, mRegistrar->getQueue());
+
+        BindingWrapperSeq wrappers = mRegistrar->getAORBindingWrappers(mAOR);
+        for (BindingWrapperSeq::const_iterator iter = wrappers.begin(); iter != wrappers.end(); ++iter)
+        {
+            pjsip_contact_hdr *contact = pjsip_contact_hdr_create(mTdata->pool);
+            contact->uri = pjsip_parse_uri(mTdata->pool, (char*) (*iter)->mBinding->contact.c_str(), (*iter)->mBinding->contact.size(), PJSIP_URI_IN_FROMTO_HDR);
+            contact->expires = (*iter)->mBinding->expiration - time(0);
+            pjsip_msg_add_hdr(mTdata->msg, (pjsip_hdr*) contact);
+        }
+        pjsip_tsx_send_msg(mTsx, mTdata);
+
+        mModule->replicateState(mAOR, existingBindings, newBindings, removedBindings);
+    }
+
+private:
+    PJSIPRegistrarModulePtr mModule;
+    RegistrarIPtr mRegistrar;
+    const std::string mAOR;
+    ContactExpireMap mContactExpireMap;
+    std::string mCallId;
+    int mCSeq;
+    pjsip_transaction *mTsx;
+    pjsip_tx_data *mTdata;
+};
+
 pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
 {
     if (rdata->msg_info.msg->line.req.method.id != PJSIP_REGISTER_METHOD)
@@ -827,17 +897,8 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
         return PJ_TRUE;
     }
 
-    // We should attempt to determine at this point who the
-    // REGISTER is from and determine whether they have permission
-    // to be doing this sort of thing. How this is done is still murky
-    // and should perhaps be put in writing on the wiki or in some other
-    // way decided upon before trying to write any code for it.
-
     std::string aor = getAOR(rdata);
-    lg(Debug) << "AoR in REGISTER is " << aor;
-    BindingWrapperSeq& currentBindings = mRegistrar->getAORBindingWrappers(aor);
     std::vector<pjsip_contact_hdr *> registerContacts = extractRegisterContacts(rdata);
-
     std::string callID(pj_strbuf(&rdata->msg_info.cid->id), pj_strlen(&rdata->msg_info.cid->id));
     int cSeq = rdata->msg_info.cseq->cseq;
 
@@ -858,38 +919,6 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
         return PJ_TRUE;
     }
 
-    BindingWrapperSeq newBindings;
-    BindingWrapperSeq removedBindings;
-    BindingWrapperSeq existingBindings;
-    for (std::vector<pjsip_contact_hdr *>::iterator iter = registerContacts.begin();
-            iter != registerContacts.end(); ++iter)
-    {
-        BindingWrapperSeq::iterator bindingToUpdate = findMatchingBinding(*iter, currentBindings);
-
-        int expiration = getExpiration(*iter, rdata);
-
-        if (bindingToUpdate != currentBindings.end())
-        {
-            if (expiration == 0)
-            {
-                lg(Debug) << "Adding " << (*bindingToUpdate)->mBinding->contact << " to our bindings to remove";
-                removedBindings.push_back(*bindingToUpdate);
-            }
-            else
-            {
-                (*bindingToUpdate)->updateBinding(callID, cSeq, expiration, mRegistrar->getQueue());
-                lg(Debug) << "Maintaining " << (*bindingToUpdate)->mBinding->contact << " in our existing bindings";
-                existingBindings.push_back(*bindingToUpdate);
-            }
-        }
-        else
-        {
-            BindingWrapperPtr wrapper = createNewBinding(*iter, callID, cSeq, expiration, aor);
-            lg(Debug) << "Adding " << wrapper->mBinding->contact << " to our bindings to add";
-            newBindings.push_back(wrapper);
-        }
-    }
-
     // We enqueue the SIP response to make sure we send it and replicate state AFTER we have updated
     // our internal bindings.
     pjsip_tx_data *tdata;
@@ -899,9 +928,18 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
         return PJ_TRUE;
     }
 
-    mRegistrar->addAndRemoveBindings(aor, newBindings, removedBindings, mRegistrar->getQueue());
-    mRegistrar->getQueue()->enqueueWork(new Response(tsx, tdata, aor, mRegistrar));
-    replicateState(aor, existingBindings, newBindings, removedBindings);
+    int defaultExpiration = getMessageExpiration(rdata);
+
+    ContactExpireMap contactExpireMap;
+    for (std::vector<pjsip_contact_hdr *>::iterator iter = registerContacts.begin();
+         iter != registerContacts.end(); ++iter)
+    {
+        addURIExpiration(contactExpireMap, (*iter), defaultExpiration);
+    }
+
+    // enqueue the work
+    getRegistrar()->getQueue()->enqueueWork(
+        new RegisterRequest(this, getRegistrar(), aor, contactExpireMap, callID, cSeq, tsx, tdata));
 
     return PJ_TRUE;
 }
diff --git a/src/PJSIPRegistrarModule.h b/src/PJSIPRegistrarModule.h
index 43dc6a6..7027da0 100644
--- a/src/PJSIPRegistrarModule.h
+++ b/src/PJSIPRegistrarModule.h
@@ -101,6 +101,8 @@ private:
 
 typedef IceUtil::Handle<RegistrarI> RegistrarIPtr;
 
+typedef std::map<std::string, int> ContactExpireMap;
+
 class PJSIPRegistrarModule : public PJSIPModule
 {
 public:
@@ -134,32 +136,35 @@ public:
         const BindingWrapperSeq& newBindings,
         const BindingWrapperSeq& removedBindings);
 
-private:
-    /**
-     * Extract the AoR from the To header of a REGISTER request
-     */
-    std::string getAOR(pjsip_rx_data *rdata);
-    /**
-     * Get the Contact URIs from a REGISTER request
-     */
-    std::vector<pjsip_contact_hdr *> extractRegisterContacts(pjsip_rx_data *rdata);
     /**
      * Try to find an existing BindingWrapper matching a Contact header from
      * a REGISTER
      */
-    BindingWrapperSeq::iterator findMatchingBinding(pjsip_contact_hdr *contact,
+    BindingWrapperSeq::iterator findMatchingBinding(const std::string& contactURI,
             BindingWrapperSeq& bindings);
-    /**
-     * Find the expiration of a binding from a REGISTER request
-     */
-    int getExpiration(pjsip_contact_hdr *contact, pjsip_rx_data *rdata);
+
     /**
      * Create a new BindingWrapper based on information extracted from a
      * REGISTER request
      */
-    BindingWrapperPtr createNewBinding(pjsip_contact_hdr *contact,
+    BindingWrapperPtr createNewBinding(const std::string& contactURI,
             const std::string& callID, int cSeq, int expiration, const std::string& aor);
 
+    /**
+     * Find the expiration of a binding from a REGISTER request
+     */
+    int getMessageExpiration(pjsip_rx_data *rdata);
+
+private:
+    /**
+     * Extract the AoR from the To header of a REGISTER request
+     */
+    std::string getAOR(pjsip_rx_data *rdata);
+    /**
+     * Get the Contact URIs from a REGISTER request
+     */
+    std::vector<pjsip_contact_hdr *> extractRegisterContacts(pjsip_rx_data *rdata);
+
     bool checkAuth(pjsip_rx_data *rdata,
             pjsip_transaction *tsx,
             AsteriskSCF::SIP::ExtensionPoint::V1::RequestType type);
@@ -169,6 +174,8 @@ private:
     pjsip_endpoint *mEndpoint;
     RegistrarIPtr mRegistrar;
     SIPReplicationContextPtr mReplicationContext;
+
+    void addURIExpiration(ContactExpireMap& contactExpireMap, pjsip_contact_hdr *contact, int defaultExpire);
 };
 
 typedef IceUtil::Handle<PJSIPRegistrarModule> PJSIPRegistrarModulePtr;
@@ -200,7 +207,10 @@ public:
      * Update the binding in the wrapper with new information. This will
      * result in rescheduling destruction of the binding.
      */
-    void updateBinding(const std::string &callID, int cSeq, int expiration, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+    void updateBinding(const std::string &callID, 
+                       int cSeq, 
+                       int expiration, 
+                       const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
 
     bool operator==(const BindingWrapper& rhs);
     bool operator==(const AsteriskSCF::SIP::Registration::V1::BindingPtr& rhs);

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list