[Asterisk-code-review] chan sip: Address runaway when realtime peers subscribe to ... (asterisk[14])

George Joseph asteriskteam at digium.com
Thu Sep 22 13:29:15 CDT 2016


George Joseph has uploaded a new change for review.

  https://gerrit.asterisk.org/3964

Change subject: chan_sip:  Address runaway when realtime peers subscribe to mailboxes
......................................................................

chan_sip:  Address runaway when realtime peers subscribe to mailboxes

Users upgrading from asterisk 13.5 to a later version and who use
realtime with peers that have mailboxes were experiencing runaway
situations that manifested as a continuous stream of taskprocessor
congestion errors, memory leaks and an unresponsive chan_sip.

A related issue was that setting rtpcachefriends=no NEVER worked in
asterisk 13 (since the move to stasis).  In 13.5 and earlier, when a
peer tried to register, all of the stasis threads would block and
chan_sip would again become unresponsive.  After 13.5, the runaway
would happen.

There were a number of causes...
* mwi_event_cb was (indirectly) calling build_peer even though calls to
  mwi_event_cb are often caused by build_peer.
* In an effort to prevent chan_sip from being unloaded while messages
  were still in flight, destroy_mailboxes was calling
  stasis_unsubscribe_and_join but in some cases waited forever for the
  final message.
* add_peer_mailboxes wasn't properly marking the existing mailboxes
  on a peer as "keep" so build_peer would always delete them all.
* add_peer_mwi_subs was unsubscribing existing mailbox subscriptions
  then just creating them again.

All of this was causing a flood of subscribes and unsubscribes on
multiple threads all for the same peer and mailbox.

Fixes...
* add_peer_mailboxes now marks mailboxes correctly and build_peer only
  deletes the ones that really are no longer needed by the peer.
* add_peer_mwi_subs now only adds subscriptions marked as "new" instead
  of unsubscribing and resubscribing everything.  It also adds the peer
  object's address to the mailbox instead of its name to the subscription
  userdata so mwi_event_cb doesn't have to call build_peer.

With these changes, with rtpcachefriends=yes (the most common setting),
there are no leaks, locks, loops or crashes at shutdown.

rtpcachefriends=no still causes leaks but at least it doesn't lock, loop
or crash.  Since making rtpcachefriends=no work wasnt in scope for this
issue, further work will have to be deferred to a separate patch.

Side fixes...
 * The ast_lock_track structure had a member named "thread" which gdb
   doesn't like since it conflicts with it's "thread" command.  That
   member was renamed to "thread_id".

ASTERISK-25468 #close

Change-Id: I07519ef7f092629e1e844f855abd279d6475cdd0
---
M channels/chan_sip.c
M channels/sip/include/sip.h
M include/asterisk/lock.h
M main/lock.c
4 files changed, 67 insertions(+), 39 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/64/3964/1

diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index f7219ad..93e48fb 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -1278,6 +1278,7 @@
 static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 static void sip_keepalive_all_peers(void);
+#define peer_in_destruction(peer) (ao2_ref(peer, 0) == 0)
 
 /*--- Applications, functions, CLI and manager command helpers */
 static const char *sip_nat_mode(const struct sip_pvt *p);
@@ -5202,13 +5203,24 @@
 	ast_free(mailbox);
 }
 
+#define REMOVE_MAILBOX_WITH_LOCKED_PEER(__peer) \
+({\
+	struct sip_mailbox *__mailbox;\
+	ao2_lock(__peer);\
+	__mailbox = AST_LIST_REMOVE_HEAD(&(__peer->mailboxes), entry);\
+	ao2_unlock(__peer);\
+	__mailbox;\
+})
+
 /*! Destroy all peer-related mailbox subscriptions */
 static void clear_peer_mailboxes(struct sip_peer *peer)
 {
 	struct sip_mailbox *mailbox;
 
-	while ((mailbox = AST_LIST_REMOVE_HEAD(&peer->mailboxes, entry)))
+	/* Lock the peer while accessing/updating the linked list but NOT while destroying the mailbox */
+	while ((mailbox = REMOVE_MAILBOX_WITH_LOCKED_PEER(peer))) {
 		destroy_mailbox(mailbox);
+	}
 }
 
 static void sip_destroy_peer_fn(void *peer)
@@ -17252,19 +17264,21 @@
 /*! \brief Receive MWI events that we have subscribed to */
 static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
-	char *peer_name = userdata;
-	struct sip_peer *peer = sip_find_peer(peer_name, NULL, TRUE, FINDALLDEVICES, FALSE, 0);
+	struct sip_peer *peer = userdata;
 
-	if (stasis_subscription_final_message(sub, msg)) {
-		/* peer can be non-NULL during reload. */
-		ao2_cleanup(peer);
-		ast_free(peer_name);
+	/*
+	 * peer can't be NULL here but the peer can be in the process of being
+	 * destroyed.  If it is, we don't want to send any messages.  In most cases,
+	 * the peer is actually gone and there's no sense sending NOTIFYs that will
+	 * never be answered.
+	 */
+	if (stasis_subscription_final_message(sub, msg) || peer_in_destruction(peer)) {
 		return;
 	}
-	if (peer && ast_mwi_state_type() == stasis_message_type(msg)) {
+
+	if (ast_mwi_state_type() == stasis_message_type(msg)) {
 		sip_send_mwi_to_peer(peer, 0);
 	}
-	ao2_cleanup(peer);
 }
 
 static void network_change_stasis_subscribe(void)
@@ -27983,15 +27997,14 @@
 
 	AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) {
 		struct stasis_topic *mailbox_specific_topic;
-		mailbox->event_sub = stasis_unsubscribe(mailbox->event_sub);
+
+		if (mailbox->status != SIP_MAILBOX_STATUS_NEW) {
+			continue;
+		}
 
 		mailbox_specific_topic = ast_mwi_topic(mailbox->id);
 		if (mailbox_specific_topic) {
-			char *peer_name = ast_strdup(peer->name);
-			if (!peer_name) {
-				return;
-			}
-			mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name);
+			mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);
 		}
 	}
 }
@@ -29216,7 +29229,9 @@
 }
 
 /*! \brief Send message waiting indication to alert peer that they've got voicemail
- *  \note Both peer and associated sip_pvt must be unlocked prior to calling this function
+ *  \note Both peer and associated sip_pvt must be unlocked prior to calling this function.
+ *  It's possible that this function will get called during peer destruction as final messages
+ *  are processed.  The peer will still be valid however.
  *  \returns -1 on failure, 0 on success
  */
 static int sip_send_mwi_to_peer(struct sip_peer *peer, int cache_only)
@@ -31066,6 +31081,7 @@
 		AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) {
 			if (!strcmp(mailbox->id, mbox)) {
 				duplicate = 1;
+				mailbox->status = SIP_MAILBOX_STATUS_EXISTING;
 				break;
 			}
 		}
@@ -31078,14 +31094,18 @@
 			continue;
 		}
 		strcpy(mailbox->id, mbox); /* SAFE */
+		mailbox->status = SIP_MAILBOX_STATUS_NEW;
+		mailbox->peer = peer;
 
 		AST_LIST_INSERT_TAIL(&peer->mailboxes, mailbox, entry);
 	}
 }
 
 /*! \brief Build peer from configuration (file or realtime static/dynamic) */
-static struct sip_peer *build_peer(const char *name, struct ast_variable *v, struct ast_variable *alt, int realtime, int devstate_only)
+static struct sip_peer *build_peer(const char *name, struct ast_variable *v_head, struct ast_variable *alt, int realtime, int devstate_only)
 {
+	/* We preserve the original value of v_head to make analyzing backtraces easier */
+	struct ast_variable *v = v_head;
 	struct sip_peer *peer = NULL;
 	struct ast_acl_list *oldacl = NULL;
 	struct ast_acl_list *olddirectmediaacl = NULL;
@@ -31149,6 +31169,7 @@
 			return NULL;
 		}
 
+
 		if (realtime && !ast_test_flag(&global_flags[1], SIP_PAGE2_RTCACHEFRIENDS)) {
 			ast_atomic_fetchadd_int(&rpeerobjs, 1);
 			ast_debug(3, "-REALTIME- peer built. Name: %s. Peer objects: %d\n", name, rpeerobjs);
@@ -31198,7 +31219,7 @@
 	if (!devstate_only) {
 		struct sip_mailbox *mailbox;
 		AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) {
-			mailbox->delme = 1;
+			mailbox->status = SIP_MAILBOX_STATUS_UNKNOWN;
 		}
 	}
 
@@ -31658,7 +31679,7 @@
 	if (!devstate_only) {
 		struct sip_mailbox *mailbox;
 		AST_LIST_TRAVERSE_SAFE_BEGIN(&peer->mailboxes, mailbox, entry) {
-			if (mailbox->delme) {
+			if (mailbox->status == SIP_MAILBOX_STATUS_UNKNOWN) {
 				AST_LIST_REMOVE_CURRENT(entry);
 				destroy_mailbox(mailbox);
 			}
diff --git a/channels/sip/include/sip.h b/channels/sip/include/sip.h
index 92dcd56..e511d13 100644
--- a/channels/sip/include/sip.h
+++ b/channels/sip/include/sip.h
@@ -1237,6 +1237,12 @@
 	struct ast_str *data;
 };
 
+enum sip_mailbox_status {
+	SIP_MAILBOX_STATUS_UNKNOWN = 0,
+	SIP_MAILBOX_STATUS_EXISTING,
+	SIP_MAILBOX_STATUS_NEW,
+};
+
 /*!
  * \brief A peer's mailbox
  *
@@ -1247,7 +1253,8 @@
 	/*! Associated MWI subscription */
 	struct stasis_subscription *event_sub;
 	AST_LIST_ENTRY(sip_mailbox) entry;
-	unsigned int delme:1;
+	struct sip_peer *peer;
+	enum sip_mailbox_status status;
 	char id[1];
 };
 
diff --git a/include/asterisk/lock.h b/include/asterisk/lock.h
index 35a244b..652ca13 100644
--- a/include/asterisk/lock.h
+++ b/include/asterisk/lock.h
@@ -113,7 +113,7 @@
 	int lineno[AST_MAX_REENTRANCY];
 	int reentrancy;
 	const char *func[AST_MAX_REENTRANCY];
-	pthread_t thread[AST_MAX_REENTRANCY];
+	pthread_t thread_id[AST_MAX_REENTRANCY];
 #ifdef HAVE_BKTR
 	struct ast_bt backtrace[AST_MAX_REENTRANCY];
 #endif
diff --git a/main/lock.c b/main/lock.c
index 2b2a809..b35ec59 100644
--- a/main/lock.c
+++ b/main/lock.c
@@ -218,7 +218,7 @@
 		lt->lineno[0] = lineno;
 		lt->func[0] = func;
 		lt->reentrancy = 0;
-		lt->thread[0] = 0;
+		lt->thread_id[0] = 0;
 #ifdef HAVE_BKTR
 		memset(&lt->backtrace[0], 0, sizeof(lt->backtrace[0]));
 #endif
@@ -322,7 +322,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = lineno;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		} else {
 			__ast_mutex_logger("%s line %d (%s): '%s' really deep reentrancy!\n",
@@ -402,7 +402,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = lineno;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		} else {
 			__ast_mutex_logger("%s line %d (%s): '%s' really deep reentrancy!\n",
@@ -445,7 +445,7 @@
 
 	if (lt) {
 		ast_reentrancy_lock(lt);
-		if (lt->reentrancy && (lt->thread[ROFFSET] != pthread_self())) {
+		if (lt->reentrancy && (lt->thread_id[ROFFSET] != pthread_self())) {
 			__ast_mutex_logger("%s line %d (%s): attempted unlock mutex '%s' without owning it!\n",
 					   filename, lineno, func, mutex_name);
 			__ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n",
@@ -466,7 +466,7 @@
 			lt->file[lt->reentrancy] = NULL;
 			lt->lineno[lt->reentrancy] = 0;
 			lt->func[lt->reentrancy] = NULL;
-			lt->thread[lt->reentrancy] = 0;
+			lt->thread_id[lt->reentrancy] = 0;
 		}
 
 #ifdef HAVE_BKTR
@@ -536,7 +536,7 @@
 	memcpy(lt->lineno, lt_saved->lineno, sizeof(lt->lineno));
 	lt->reentrancy = lt_saved->reentrancy;
 	memcpy(lt->func, lt_saved->func, sizeof(lt->func));
-	memcpy(lt->thread, lt_saved->thread, sizeof(lt->thread));
+	memcpy(lt->thread_id, lt_saved->thread_id, sizeof(lt->thread_id));
 #ifdef HAVE_BKTR
 	memcpy(lt->backtrace, lt_saved->backtrace, sizeof(lt->backtrace));
 #endif
@@ -571,7 +571,7 @@
 
 	if (lt) {
 		ast_reentrancy_lock(lt);
-		if (lt->reentrancy && (lt->thread[ROFFSET] != pthread_self())) {
+		if (lt->reentrancy && (lt->thread_id[ROFFSET] != pthread_self())) {
 			__ast_mutex_logger("%s line %d (%s): attempted wait using mutex '%s' without owning it!\n",
 					   filename, lineno, func, mutex_name);
 			__ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n",
@@ -639,7 +639,7 @@
 
 	if (lt) {
 		ast_reentrancy_lock(lt);
-		if (lt->reentrancy && (lt->thread[ROFFSET] != pthread_self())) {
+		if (lt->reentrancy && (lt->thread_id[ROFFSET] != pthread_self())) {
 			__ast_mutex_logger("%s line %d (%s): attempted wait using mutex '%s' without owning it!\n",
 					   filename, lineno, func, mutex_name);
 			__ast_mutex_logger("%s line %d (%s): '%s' was locked here.\n",
@@ -747,7 +747,7 @@
 		lt->lineno[0] = lineno;
 		lt->func[0] = func;
 		lt->reentrancy = 0;
-		lt->thread[0] = 0;
+		lt->thread_id[0] = 0;
 #ifdef HAVE_BKTR
 		memset(&lt->backtrace[0], 0, sizeof(lt->backtrace[0]));
 #endif
@@ -790,13 +790,13 @@
 			int i;
 			pthread_t self = pthread_self();
 			for (i = lt->reentrancy - 1; i >= 0; --i) {
-				if (lt->thread[i] == self) {
+				if (lt->thread_id[i] == self) {
 					lock_found = 1;
 					if (i != lt->reentrancy - 1) {
 						lt->file[i] = lt->file[lt->reentrancy - 1];
 						lt->lineno[i] = lt->lineno[lt->reentrancy - 1];
 						lt->func[i] = lt->func[lt->reentrancy - 1];
-						lt->thread[i] = lt->thread[lt->reentrancy - 1];
+						lt->thread_id[i] = lt->thread_id[lt->reentrancy - 1];
 					}
 #ifdef HAVE_BKTR
 					bt = &lt->backtrace[i];
@@ -804,7 +804,7 @@
 					lt->file[lt->reentrancy - 1] = NULL;
 					lt->lineno[lt->reentrancy - 1] = 0;
 					lt->func[lt->reentrancy - 1] = NULL;
-					lt->thread[lt->reentrancy - 1] = AST_PTHREADT_NULL;
+					lt->thread_id[lt->reentrancy - 1] = AST_PTHREADT_NULL;
 					break;
 				}
 			}
@@ -918,7 +918,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = line;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		}
 		ast_reentrancy_unlock(lt);
@@ -1027,7 +1027,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = line;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		}
 		ast_reentrancy_unlock(lt);
@@ -1120,7 +1120,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = line;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		}
 		ast_reentrancy_unlock(lt);
@@ -1213,7 +1213,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = line;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		}
 		ast_reentrancy_unlock(lt);
@@ -1288,7 +1288,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = line;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		}
 		ast_reentrancy_unlock(lt);
@@ -1347,7 +1347,7 @@
 			lt->file[lt->reentrancy] = filename;
 			lt->lineno[lt->reentrancy] = line;
 			lt->func[lt->reentrancy] = func;
-			lt->thread[lt->reentrancy] = pthread_self();
+			lt->thread_id[lt->reentrancy] = pthread_self();
 			lt->reentrancy++;
 		}
 		ast_reentrancy_unlock(lt);

-- 
To view, visit https://gerrit.asterisk.org/3964
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I07519ef7f092629e1e844f855abd279d6475cdd0
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: 14
Gerrit-Owner: George Joseph <gjoseph at digium.com>



More information about the asterisk-code-review mailing list