[Asterisk-code-review] app voicemail: Remove need to subscribe to stasis (asterisk[13])

George Joseph asteriskteam at digium.com
Thu Sep 13 09:44:47 CDT 2018


George Joseph has uploaded this change for review. ( https://gerrit.asterisk.org/10132


Change subject: app_voicemail: Remove need to subscribe to stasis
......................................................................

app_voicemail: Remove need to subscribe to stasis

app_voicemail was using the stasis cache to build and maintain a
list of mailboxes that had subscribers.  It then used this list
to determine if a mailbox should be polled for new messages if
polling was enabled.  For this to work, stasis had to cache every
subscription and unsubscription to the mailbox which caused a lot of
overhead, both cpu and memory related.

Since polling is only required when changes are being made to
mailboxes outside of app_voicemail and since the number of mailboxes
that don't have any subscribers is likely to be very low, all
mailboxes are now polled instead of just the ones with subscribers.

This paves the way for disabling the caching of stasis subscription
change messages.

Change-Id: I5cceb737246949f9782955c64425b8bd25a9e9ee
---
M apps/app_voicemail.c
1 file changed, 107 insertions(+), 222 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/32/10132/1

diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c
index 94af147..57d83eb 100644
--- a/apps/app_voicemail.c
+++ b/apps/app_voicemail.c
@@ -994,42 +994,21 @@
 static pthread_t poll_thread = AST_PTHREADT_NULL;
 static unsigned char poll_thread_run;
 
-/*! Subscription to MWI event subscription changes */
-static struct stasis_subscription *mwi_sub_sub;
-
 /*!
- * \brief An MWI subscription
+ * \brief A mailbox to be polled
  *
- * This is so we can keep track of which mailboxes are subscribed to.
  * This way, we know which mailboxes to poll when the pollmailboxes
  * option is being used.
  */
-struct mwi_sub {
-	AST_RWLIST_ENTRY(mwi_sub) entry;
+struct poll_state {
+	AST_LIST_ENTRY(poll_state) entry;
 	int old_urgent;
 	int old_new;
 	int old_old;
-	char *uniqueid;
 	char mailbox[0];
 };
 
-struct mwi_sub_task {
-	const char *mailbox;
-	const char *context;
-	const char *uniqueid;
-};
-
-static void mwi_sub_task_dtor(struct mwi_sub_task *mwist)
-{
-	ast_free((void *) mwist->mailbox);
-	ast_free((void *) mwist->context);
-	ast_free((void *) mwist->uniqueid);
-	ast_free(mwist);
-}
-
-static struct ast_taskprocessor *mwi_subscription_tps;
-
-static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
+static AST_RWLIST_HEAD_STATIC(poll_list, poll_state);
 
 /* custom audio control prompts for voicemail playback */
 static char listen_control_forward_key[12];
@@ -1106,6 +1085,7 @@
 static const char *substitute_escapes(const char *value);
 static int message_range_and_existence_check(struct vm_state *vms, const char *msg_ids [], size_t num_msgs, int *msg_nums, struct ast_vm_user *vmu);
 static void notify_new_state(struct ast_vm_user *vmu);
+static void poll_mailbox(struct poll_state *poll_state);
 
 /*!
  * Place a message in the indicated folder
@@ -12196,6 +12176,43 @@
 	return 0;
 }
 
+/* A write lock on the poll list should already be held */
+static int append_poll_list(struct ast_vm_user *vmu)
+{
+	size_t len;
+	struct poll_state *poll_state;
+
+	len = sizeof(*poll_state) + 1;
+
+	if (!ast_strlen_zero(vmu->mailbox)) {
+		len += strlen(vmu->mailbox);
+	}
+
+	if (!ast_strlen_zero(vmu->context)) {
+		len += strlen(vmu->context) + 1; /* Allow for seperator */
+	}
+
+	poll_state = ast_calloc(1, len);
+	if (!poll_state) {
+		return -1;
+	}
+
+	if (!ast_strlen_zero(vmu->mailbox)) {
+		strcpy(poll_state->mailbox, vmu->mailbox);
+	}
+
+	if (!ast_strlen_zero(vmu->context)) {
+		strcat(poll_state->mailbox, "@");
+		strcat(poll_state->mailbox, vmu->context);
+	}
+
+	AST_RWLIST_INSERT_TAIL(&poll_list, poll_state, entry);
+
+	poll_mailbox(poll_state);
+
+	return 0;
+}
+
 static struct ast_vm_user *find_or_create(const char *context, const char *box)
 {
 	struct ast_vm_user *vmu;
@@ -12232,6 +12249,7 @@
 	ast_copy_string(vmu->mailbox, box, sizeof(vmu->mailbox));
 
 	AST_LIST_INSERT_TAIL(&users, vmu, list);
+	append_poll_list(vmu);
 
 	return vmu;
 }
@@ -13037,38 +13055,44 @@
 	AST_DATA_ENTRY("asterisk/application/voicemail/list", &vm_users_data_provider)
 };
 
-static void poll_subscribed_mailbox(struct mwi_sub *mwi_sub)
+static void poll_mailbox(struct poll_state *poll_state)
 {
 	int new = 0, old = 0, urgent = 0;
 
-	inboxcount2(mwi_sub->mailbox, &urgent, &new, &old);
+
+	inboxcount2(poll_state->mailbox, &urgent, &new, &old);
+	ast_debug(4, "Polled mailbox '%s' urgent: %d  new: %d  old: %d\n",
+		poll_state->mailbox, urgent, new, old);
 
 #ifdef IMAP_STORAGE
 	if (imap_poll_logout) {
-		imap_logout(mwi_sub->mailbox);
+		imap_logout(poll_state->mailbox);
 	}
 #endif
 
-	if (urgent != mwi_sub->old_urgent || new != mwi_sub->old_new || old != mwi_sub->old_old) {
-		mwi_sub->old_urgent = urgent;
-		mwi_sub->old_new = new;
-		mwi_sub->old_old = old;
-		queue_mwi_event(NULL, mwi_sub->mailbox, urgent, new, old);
-		run_externnotify(NULL, mwi_sub->mailbox, NULL);
+	if (urgent != poll_state->old_urgent || new != poll_state->old_new || old != poll_state->old_old) {
+		ast_debug(4, "Notifying subscribers of mailbox '%s' urgent: %d  new: %d  old: %d\n",
+			poll_state->mailbox, urgent, new, old);
+		poll_state->old_urgent = urgent;
+		poll_state->old_new = new;
+		poll_state->old_old = old;
+		queue_mwi_event(NULL, poll_state->mailbox, urgent, new, old);
+		run_externnotify(NULL, poll_state->mailbox, NULL);
 	}
 }
 
-static void poll_subscribed_mailboxes(void)
+static void poll_all_mailboxes(void)
 {
-	struct mwi_sub *mwi_sub;
+	struct poll_state *poll_state;
 
-	AST_RWLIST_RDLOCK(&mwi_subs);
-	AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) {
-		if (!ast_strlen_zero(mwi_sub->mailbox)) {
-			poll_subscribed_mailbox(mwi_sub);
+	ast_debug(3, "Polling mailboxes\n");
+	AST_RWLIST_RDLOCK(&poll_list);
+	AST_RWLIST_TRAVERSE(&poll_list, poll_state, entry) {
+		if (!ast_strlen_zero(poll_state->mailbox)) {
+			poll_mailbox(poll_state);
 		}
 	}
-	AST_RWLIST_UNLOCK(&mwi_subs);
+	AST_RWLIST_UNLOCK(&poll_list);
 }
 
 static void *mb_poll_thread(void *data)
@@ -13088,18 +13112,12 @@
 		if (!poll_thread_run)
 			break;
 
-		poll_subscribed_mailboxes();
+		poll_all_mailboxes();
 	}
 
 	return NULL;
 }
 
-static void mwi_sub_destroy(struct mwi_sub *mwi_sub)
-{
-	ast_free(mwi_sub->uniqueid);
-	ast_free(mwi_sub);
-}
-
 #ifdef IMAP_STORAGE
 static void imap_logout(const char *mailbox_id)
 {
@@ -13137,153 +13155,21 @@
 
 static void imap_close_subscribed_mailboxes(void)
 {
-	struct mwi_sub *mwi_sub;
+	struct poll_state *poll_state;
 
-	AST_RWLIST_RDLOCK(&mwi_subs);
-	AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) {
-		if (!ast_strlen_zero(mwi_sub->mailbox)) {
-			imap_logout(mwi_sub->mailbox);
+	AST_RWLIST_RDLOCK(&poll_list);
+	AST_RWLIST_TRAVERSE(&poll_list, poll_state, entry) {
+		if (!ast_strlen_zero(poll_state->mailbox)) {
+			imap_logout(poll_state->mailbox);
 		}
 	}
-	AST_RWLIST_UNLOCK(&mwi_subs);
+	AST_RWLIST_UNLOCK(&poll_list);
 }
 #endif
 
-static int handle_unsubscribe(void *datap)
-{
-	struct mwi_sub *mwi_sub;
-	char *uniqueid = datap;
-
-	AST_RWLIST_WRLOCK(&mwi_subs);
-	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) {
-		if (!strcmp(mwi_sub->uniqueid, uniqueid)) {
-			AST_LIST_REMOVE_CURRENT(entry);
-			/* Don't break here since a duplicate uniqueid
-			 * may have been added as a result of a cache dump. */
-#ifdef IMAP_STORAGE
-			imap_logout(mwi_sub->mailbox);
-#endif
-			mwi_sub_destroy(mwi_sub);
-		}
-	}
-	AST_RWLIST_TRAVERSE_SAFE_END
-	AST_RWLIST_UNLOCK(&mwi_subs);
-
-	ast_free(uniqueid);
-	return 0;
-}
-
-static int handle_subscribe(void *datap)
-{
-	unsigned int len;
-	struct mwi_sub *mwi_sub;
-	struct mwi_sub_task *p = datap;
-
-	len = sizeof(*mwi_sub) + 1;
-	if (!ast_strlen_zero(p->mailbox))
-		len += strlen(p->mailbox);
-
-	if (!ast_strlen_zero(p->context))
-		len += strlen(p->context) + 1; /* Allow for seperator */
-
-	if (!(mwi_sub = ast_calloc(1, len)))
-		return -1;
-
-	mwi_sub->uniqueid = ast_strdup(p->uniqueid);
-	if (!ast_strlen_zero(p->mailbox))
-		strcpy(mwi_sub->mailbox, p->mailbox);
-
-	if (!ast_strlen_zero(p->context)) {
-		strcat(mwi_sub->mailbox, "@");
-		strcat(mwi_sub->mailbox, p->context);
-	}
-
-	AST_RWLIST_WRLOCK(&mwi_subs);
-	AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry);
-	AST_RWLIST_UNLOCK(&mwi_subs);
-	mwi_sub_task_dtor(p);
-	poll_subscribed_mailbox(mwi_sub);
-	return 0;
-}
-
-static void mwi_unsub_event_cb(struct stasis_subscription_change *change)
-{
-	char *uniqueid = ast_strdup(change->uniqueid);
-
-	if (!uniqueid) {
-		ast_log(LOG_ERROR, "Unable to allocate memory for uniqueid\n");
-		return;
-	}
-
-	if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
-		ast_free(uniqueid);
-	}
-}
-
-static void mwi_sub_event_cb(struct stasis_subscription_change *change)
-{
-	struct mwi_sub_task *mwist;
-	char *context;
-	char *mailbox;
-
-	mwist = ast_calloc(1, (sizeof(*mwist)));
-	if (!mwist) {
-		return;
-	}
-
-	if (separate_mailbox(ast_strdupa(stasis_topic_name(change->topic)), &mailbox, &context)) {
-		ast_free(mwist);
-		return;
-	}
-
-	mwist->mailbox = ast_strdup(mailbox);
-	mwist->context = ast_strdup(context);
-	mwist->uniqueid = ast_strdup(change->uniqueid);
-
-	if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
-		mwi_sub_task_dtor(mwist);
-	}
-}
-
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
-{
-	struct stasis_subscription_change *change;
-	/* Only looking for subscription change notices here */
-	if (stasis_message_type(msg) != stasis_subscription_change_type()) {
-		return;
-	}
-
-	change = stasis_message_data(msg);
-	if (change->topic == ast_mwi_topic_all()) {
-		return;
-	}
-
-	if (!strcmp(change->description, "Subscribe")) {
-		mwi_sub_event_cb(change);
-	} else if (!strcmp(change->description, "Unsubscribe")) {
-		mwi_unsub_event_cb(change);
-	}
-}
-
-static int dump_cache(void *obj, void *arg, int flags)
-{
-	struct stasis_message *msg = obj;
-	mwi_event_cb(NULL, NULL, msg);
-	return 0;
-}
-
 static void start_poll_thread(void)
 {
 	int errcode;
-	mwi_sub_sub = stasis_subscribe(ast_mwi_topic_all(), mwi_event_cb, NULL);
-
-	if (mwi_sub_sub) {
-		struct ao2_container *cached = stasis_cache_dump(ast_mwi_state_cache(), stasis_subscription_change_type());
-		if (cached) {
-			ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL);
-		}
-		ao2_cleanup(cached);
-	}
 
 	poll_thread_run = 1;
 
@@ -13296,8 +13182,6 @@
 {
 	poll_thread_run = 0;
 
-	mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
-
 	ast_mutex_lock(&poll_lock);
 	ast_cond_signal(&poll_cond);
 	ast_mutex_unlock(&poll_lock);
@@ -13311,34 +13195,34 @@
 {
 	const char *context = astman_get_header(m, "Context");
 	const char *mailbox = astman_get_header(m, "Mailbox");
-	struct mwi_sub *mwi_sub;
+	struct poll_state *poll_state;
 	const char *at;
 
-	AST_RWLIST_RDLOCK(&mwi_subs);
-	AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) {
-		if (!ast_strlen_zero(mwi_sub->mailbox)) {
+	AST_RWLIST_RDLOCK(&poll_list);
+	AST_RWLIST_TRAVERSE(&poll_list, poll_state, entry) {
+		if (!ast_strlen_zero(poll_state->mailbox)) {
 			if (
 				/* First case: everything matches */
 				(ast_strlen_zero(context) && ast_strlen_zero(mailbox)) ||
 				/* Second case: match the mailbox only */
 				(ast_strlen_zero(context) && !ast_strlen_zero(mailbox) &&
-					(at = strchr(mwi_sub->mailbox, '@')) &&
-					strncmp(mailbox, mwi_sub->mailbox, at - mwi_sub->mailbox) == 0) ||
+					(at = strchr(poll_state->mailbox, '@')) &&
+					strncmp(mailbox, poll_state->mailbox, at - poll_state->mailbox) == 0) ||
 				/* Third case: match the context only */
 				(!ast_strlen_zero(context) && ast_strlen_zero(mailbox) &&
-					(at = strchr(mwi_sub->mailbox, '@')) &&
+					(at = strchr(poll_state->mailbox, '@')) &&
 					strcmp(context, at + 1) == 0) ||
 				/* Final case: match an exact specified mailbox */
 				(!ast_strlen_zero(context) && !ast_strlen_zero(mailbox) &&
-					(at = strchr(mwi_sub->mailbox, '@')) &&
-					strncmp(mailbox, mwi_sub->mailbox, at - mwi_sub->mailbox) == 0 &&
+					(at = strchr(poll_state->mailbox, '@')) &&
+					strncmp(mailbox, poll_state->mailbox, at - poll_state->mailbox) == 0 &&
 					strcmp(context, at + 1) == 0)
 			) {
-				poll_subscribed_mailbox(mwi_sub);
+				poll_mailbox(poll_state);
 			}
 		}
 	}
-	AST_RWLIST_UNLOCK(&mwi_subs);
+	AST_RWLIST_UNLOCK(&poll_list);
 	astman_send_ack(s, m, "Refresh sent");
 	return RESULT_SUCCESS;
 }
@@ -13487,6 +13371,17 @@
 	AST_LIST_UNLOCK(&users);
 }
 
+/*! \brief Free poll list. */
+static void free_poll_list(void)
+{
+	struct poll_state *current;
+	AST_RWLIST_WRLOCK(&poll_list);
+	while ((current = AST_RWLIST_REMOVE_HEAD(&poll_list, entry))) {
+		ast_free(current);
+	}
+	AST_RWLIST_UNLOCK(&poll_list);
+}
+
 /*! \brief Free the zones structure. */
 static void free_vm_zones(void)
 {
@@ -13604,8 +13499,6 @@
 	int x;
 	unsigned int tmpadsi[4];
 	char secretfn[PATH_MAX] = "";
-	long tps_queue_low;
-	long tps_queue_high;
 
 #ifdef IMAP_STORAGE
 	ast_copy_string(imapparentfolder, "\0", sizeof(imapparentfolder));
@@ -13621,12 +13514,20 @@
 	imap_close_subscribed_mailboxes();
 #endif
 
+	if (poll_thread != AST_PTHREADT_NULL) {
+		stop_poll_thread();
+	}
+
+	/* Free the poll list */
+	free_poll_list();
+
 	/* Free all the users structure */
 	free_vm_users();
 
 	/* Free all the zones structure */
 	free_vm_zones();
 
+	AST_RWLIST_WRLOCK(&poll_list);
 	AST_LIST_LOCK(&users);
 
 	memset(ext_pass_cmd, 0, sizeof(ext_pass_cmd));
@@ -14194,23 +14095,11 @@
 			pagerbody = ast_strdup(substitute_escapes(val));
 		}
 
-		tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
 		if ((val = ast_variable_retrieve(cfg, "general", "tps_queue_high"))) {
-			if (sscanf(val, "%30ld", &tps_queue_high) != 1 || tps_queue_high <= 0) {
-				ast_log(AST_LOG_WARNING, "Invalid the taskprocessor high water alert trigger level '%s'\n", val);
-				tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
-			}
+			ast_log(LOG_NOTICE, "Parameter tps_queue_high is obsolete and will be ignored\n");
 		}
-		tps_queue_low = -1;
 		if ((val = ast_variable_retrieve(cfg, "general", "tps_queue_low"))) {
-			if (sscanf(val, "%30ld", &tps_queue_low) != 1 ||
-				tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
-				ast_log(AST_LOG_WARNING, "Invalid the taskprocessor low water clear alert level '%s'\n", val);
-				tps_queue_low = -1;
-			}
-		}
-		if (ast_taskprocessor_alert_set_levels(mwi_subscription_tps, tps_queue_low, tps_queue_high)) {
-			ast_log(AST_LOG_WARNING, "Failed to set alert levels for voicemail taskprocessor.\n");
+			ast_log(LOG_NOTICE, "Parameter tps_queue_low is obsolete and will be ignored\n");
 		}
 
 		/* load mailboxes from users.conf */
@@ -14280,15 +14169,15 @@
 		}
 
 		AST_LIST_UNLOCK(&users);
+		AST_RWLIST_UNLOCK(&poll_list);
 
 		if (poll_mailboxes && poll_thread == AST_PTHREADT_NULL)
 			start_poll_thread();
-		if (!poll_mailboxes && poll_thread != AST_PTHREADT_NULL)
-			stop_poll_thread();;
 
 		return 0;
 	} else {
 		AST_LIST_UNLOCK(&users);
+		AST_RWLIST_UNLOCK(&poll_list);
 		ast_log(AST_LOG_WARNING, "Failed to load configuration file.\n");
 		return 0;
 	}
@@ -15016,13 +14905,13 @@
 	if (poll_thread != AST_PTHREADT_NULL)
 		stop_poll_thread();
 
-	mwi_subscription_tps = ast_taskprocessor_unreference(mwi_subscription_tps);
 	ast_unload_realtime("voicemail");
 	ast_unload_realtime("voicemail_data");
 
 #ifdef IMAP_STORAGE
 	imap_close_subscribed_mailboxes();
 #endif
+	free_poll_list();
 	free_vm_users();
 	free_vm_zones();
 	return res;
@@ -15055,10 +14944,6 @@
 	/* compute the location of the voicemail spool directory */
 	snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
 
-	if (!(mwi_subscription_tps = ast_taskprocessor_get("app_voicemail", 0))) {
-		ast_log(AST_LOG_WARNING, "failed to reference mwi subscription taskprocessor.  MWI will not work\n");
-	}
-
 	if ((res = load_config(0))) {
 		unload_module();
 		return AST_MODULE_LOAD_DECLINE;

-- 
To view, visit https://gerrit.asterisk.org/10132
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-MessageType: newchange
Gerrit-Change-Id: I5cceb737246949f9782955c64425b8bd25a9e9ee
Gerrit-Change-Number: 10132
Gerrit-PatchSet: 1
Gerrit-Owner: George Joseph <gjoseph at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180913/18e684de/attachment-0001.html>


More information about the asterisk-code-review mailing list