[Asterisk-code-review] Frame deferral: Will this be the last rework on it? (asterisk[13])

Richard Mudgett asteriskteam at digium.com
Mon Jan 23 15:45:18 CST 2017


Richard Mudgett has uploaded a new change for review. ( https://gerrit.asterisk.org/4771 )

Change subject: Frame deferral: Will this be the last rework on it?
......................................................................

Frame deferral: Will this be the last rework on it?

There are several issues with deferring frames that are handled by this
patch.

1) Using the timerfd timing module can cause channel freezing, lingering,
or deadlock issues.  The problem is because this is the only timing module
that uses an associated alert-pipe.  When the alert-pipe becomes
unbalanced with respect to the number of frames in the read queue bad
things can happen.  If the alert-pipe has fewer alerts queued than the
read queue then nothing might wake up the thread to handle received frames
from the channel driver.  For local channels this is the only way to wake
up the thread to handle received frames.  Being unbalanced in the other
direction is less of an issue as it will cause unnecessary reads into the
channel driver.

* In channel.c:__ast_queue_frame(): Adding frame lists to the read queue
did not add the same number of alerts to the alert-pipe.  Correspondingly,
when there is an exceptionally long queue event, any removed frames did
not also remove the corresponding number of alerts from the alert-pipe.

2) Deferrable frames can come directly from the channel driver as well as
the read queue.  These frames need to be added to the deferred queue.

* In channel.c extracted read_core() out of __ast_read() so __ast_read()
can save off deferrable frames from any source.

3) Whoever is deferring frames is really only doing the __ast_read() to
collect deferred frames and doesn't care about the returned frames except
to detect a hangup event.  When frame deferral is completed we must make
the normal frame processing see the hangup as a frame anyway.  As such,
there is no need to have varying hangup frame deferral methods.

* Removed the defer_hangups parameter parameter from
ast_channel_start_defer_frames() as it is no longer needed.

* Made channel.c:ast_channel_destructor() destroy any frames that may
still exist in the deferred read queue.  This is a fail safe as there
should never be any frames remaining in the deferred queue when a channel
is destroyed.

* Added a comment note in channel.c:channel_do_masquerade() that the
deferred read queue does not participate in masquerades.

4) To properly deal with deferrable frames from the channel driver as
pointed out by (2) above, means that it is possible to process a dialplan
interception routine while frames are deferred because of the
AST_CONTROL_READ_ACTION control frame.  Deferring frames is not
implemented as a re-entrant operation so you could have the unsupported
case of two sections of code thinking they have control of the media
stream.

A dialplan intercept routine is equivalent to an interrupt routine.  As
such, the routine must be done quickly and you do not have access to the
media stream.  These restrictions are necessary because the media stream
is the responsibility of some other code and interfering with or delaying
that processing is bad.  A possible future dialplan processing
architecture change may allow the interception routine to run in a
different thread from the main thread handling the media and remove the
execution time restriction.

* Made res_agi.c:run_agi() running an AGI in an interception routine run
in DeadAGI mode.  No touchy channel frames.

ASTERISK-26716 #close

Change-Id: If8383a8b23ba9a335c138a9c0e79fca24b78343c
---
M include/asterisk/channel.h
M main/autoservice.c
M main/channel.c
M res/res_agi.c
4 files changed, 160 insertions(+), 81 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/71/4771/1

diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h
index a7a3a64..6f64df2 100644
--- a/include/asterisk/channel.h
+++ b/include/asterisk/channel.h
@@ -971,11 +971,6 @@
 	 * frames should be deferred.
 	 */
 	AST_FLAG_DEFER_FRAMES = (1 << 28),
-	/*!
-	 * The channel is currently deferring hangup frames
-	 * in addition to other frame types.
-	 */
-	AST_FLAG_DEFER_HANGUP_FRAMES = (1 << 29),
 };
 
 /*! \brief ast_bridge_config flags */
@@ -4721,17 +4716,11 @@
  * drop important frames. This function can be called so that important frames
  * will be deferred, rather than placed in the channel frame queue as normal.
  *
- * Hangups are an interesting frame type. Hangups will always be detectable by
- * a reader when a channel is deferring frames. If the defer_hangups parameter
- * is non-zero, then the hangup frame will also be duplicated and deferred, so
- * that the next reader of the channel will get the hangup frame, too.
- *
  * \pre chan MUST be locked before calling
  *
  * \param chan The channel on which frames should be deferred
- * \param defer_hangups Defer hangups in addition to other deferrable frames
  */
-void ast_channel_start_defer_frames(struct ast_channel *chan, int defer_hangups);
+void ast_channel_start_defer_frames(struct ast_channel *chan);
 
 /*!
  * \brief Stop deferring deferrable frames on this channel
@@ -4746,4 +4735,21 @@
  */
 void ast_channel_stop_defer_frames(struct ast_channel *chan);
 
+/*!
+ * \brief Am I currently running an intercept dialplan routine.
+ * \since 13.14.0
+ *
+ * \details
+ * A dialplan intercept routine is equivalent to an interrupt
+ * routine.  As such, the routine must be done quickly and you
+ * do not have access to the media stream.  These restrictions
+ * are necessary because the media stream is the responsibility
+ * of some other code and interfering with or delaying that
+ * processing is bad.
+ *
+ * \retval 0 Not in an intercept routine.
+ * \retval 1 In an intercept routine.
+ */
+int ast_channel_get_intercept_mode(void);
+
 #endif /* _ASTERISK_CHANNEL_H */
diff --git a/main/autoservice.c b/main/autoservice.c
index 81d267c..f26d8fc 100644
--- a/main/autoservice.c
+++ b/main/autoservice.c
@@ -167,7 +167,7 @@
 	as->orig_end_dtmf_flag = ast_test_flag(ast_channel_flags(chan), AST_FLAG_END_DTMF_ONLY) ? 1 : 0;
 	if (!as->orig_end_dtmf_flag)
 		ast_set_flag(ast_channel_flags(chan), AST_FLAG_END_DTMF_ONLY);
-	ast_channel_start_defer_frames(chan, 1);
+	ast_channel_start_defer_frames(chan);
 	ast_channel_unlock(chan);
 
 	AST_LIST_LOCK(&aslist);
diff --git a/main/channel.c b/main/channel.c
index 637488a..75f9bc4 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -1061,10 +1061,16 @@
 	return tmp;
 }
 
-void ast_channel_start_defer_frames(struct ast_channel *chan, int defer_hangups)
+void ast_channel_start_defer_frames(struct ast_channel *chan)
 {
+	/*
+	 * We cannot start deferring frames when it is already active.
+	 * Otherwise we would need to implement a deferred start count
+	 * to keep track and deal with other associated complications.
+	 */
+	ast_assert(!ast_test_flag(ast_channel_flags(chan), AST_FLAG_DEFER_FRAMES));
+
 	ast_set_flag(ast_channel_flags(chan), AST_FLAG_DEFER_FRAMES);
-	ast_set2_flag(ast_channel_flags(chan), defer_hangups, AST_FLAG_DEFER_HANGUP_FRAMES);
 }
 
 void ast_channel_stop_defer_frames(struct ast_channel *chan)
@@ -1166,6 +1172,9 @@
 				}
 				AST_LIST_REMOVE_CURRENT(frame_list);
 				ast_frfree(cur);
+
+				/* Read from the alert pipe for each flushed frame. */
+				ast_channel_internal_alert_read(chan);
 			}
 		}
 		AST_LIST_TRAVERSE_SAFE_END;
@@ -1182,9 +1191,13 @@
 	}
 
 	if (ast_channel_alert_writable(chan)) {
-		if (ast_channel_alert_write(chan)) {
-			ast_log(LOG_WARNING, "Unable to write to alert pipe on %s (qlen = %u): %s!\n",
-				ast_channel_name(chan), queued_frames, strerror(errno));
+		/* Write to the alert pipe for each added frame */
+		while (new_frames--) {
+			if (ast_channel_alert_write(chan)) {
+				ast_log(LOG_WARNING, "Unable to write to alert pipe on %s (qlen = %u): %s!\n",
+					ast_channel_name(chan), queued_frames, strerror(errno));
+				break;
+			}
 		}
 	} else if (ast_channel_timingfd(chan) > -1) {
 		ast_timer_enable_continuous(ast_channel_timer(chan));
@@ -1550,7 +1563,7 @@
 	}
 
 	ast_channel_lock(chan);
-	ast_channel_start_defer_frames(chan, 0);
+	ast_channel_start_defer_frames(chan);
 	ast_channel_unlock(chan);
 
 	start = ast_tvnow();
@@ -2310,8 +2323,12 @@
 	}
 	close(ast_channel_epfd(chan));
 #endif
-	while ((f = AST_LIST_REMOVE_HEAD(ast_channel_readq(chan), frame_list)))
+	while ((f = AST_LIST_REMOVE_HEAD(ast_channel_readq(chan), frame_list))) {
 		ast_frfree(f);
+	}
+	while ((f = AST_LIST_REMOVE_HEAD(ast_channel_deferred_readq(chan), frame_list))) {
+		ast_frfree(f);
+	}
 
 	/* loop over the variables list, freeing all data and deleting list items */
 	/* no need to lock the list, as the channel is already locked */
@@ -3757,16 +3774,17 @@
 	return samples;
 }
 
-static struct ast_frame *__ast_read(struct ast_channel *chan, int dropaudio)
+static struct ast_frame *read_core(struct ast_channel *chan, int dropaudio)
 {
 	struct ast_frame *f = NULL;	/* the return value */
 	int prestate;
 	int cause = 0;
 
-	/* this function is very long so make sure there is only one return
-	 * point at the end (there are only two exceptions to this).
+	/*
+	 * On entry the channel is locked so it must return locked.
+	 * This function is very long so make sure there is only one return
+	 * point at the end.
 	 */
-	ast_channel_lock(chan);
 
 	/* Stop if we're a zombie or need a soft hangup */
 	if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE) || ast_check_hangup(chan)) {
@@ -3840,14 +3858,14 @@
 				if (got_ref) {
 					ao2_ref(data, -1);
 				}
+				ast_channel_lock(chan);
 			} else {
 				ast_timer_set_rate(ast_channel_timer(chan), 0);
 				ast_channel_fdno_set(chan, -1);
-				ast_channel_unlock(chan);
 			}
 
-			/* cannot 'goto done' because the channel is already unlocked */
-			return &ast_null_frame;
+			f = &ast_null_frame;
+			goto done;
 
 		case AST_TIMING_EVENT_CONTINUOUS:
 			if (AST_LIST_EMPTY(ast_channel_readq(chan)) ||
@@ -3882,36 +3900,6 @@
 	/* Check for pending read queue */
 	if (!AST_LIST_EMPTY(ast_channel_readq(chan))) {
 		int skip_dtmf = should_skip_dtmf(chan);
-
-		if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DEFER_FRAMES)) {
-			AST_LIST_TRAVERSE_SAFE_BEGIN(ast_channel_readq(chan), f, frame_list) {
-				if (ast_is_deferrable_frame(f)) {
-					if(f->frametype == AST_FRAME_CONTROL && 
-						(f->subclass.integer == AST_CONTROL_HANGUP ||
-						 f->subclass.integer == AST_CONTROL_END_OF_Q)) {
-						/* Hangup is a special case. We want to defer the frame, but we also do not
-						 * want to remove it from the frame queue. So rather than just moving the frame
-						 * over, we duplicate it and move the copy to the deferred readq.
-						 *
-						 * The reason for this? This way, whoever calls ast_read() will get a NULL return
-						 * immediately and can tell the channel has hung up and do what it needs to. Also,
-						 * when frame deferral finishes, then whoever calls ast_read() next will also get
-						 * the hangup.
-						 */
-						if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DEFER_HANGUP_FRAMES)) {
-							struct ast_frame *dup;
-
-							dup = ast_frdup(f);
-							AST_LIST_INSERT_HEAD(ast_channel_deferred_readq(chan), dup, frame_list);
-						}
-					} else {
-						AST_LIST_INSERT_HEAD(ast_channel_deferred_readq(chan), f, frame_list);
-						AST_LIST_REMOVE_CURRENT(frame_list);
-					}
-				}
-			}
-			AST_LIST_TRAVERSE_SAFE_END;
-		}
 
 		AST_LIST_TRAVERSE_SAFE_BEGIN(ast_channel_readq(chan), f, frame_list) {
 			/* We have to be picky about which frame we pull off of the readq because
@@ -4387,6 +4375,70 @@
 		ast_audiohook_detach_list(ast_channel_audiohooks(chan));
 		ast_channel_audiohooks_set(chan, NULL);
 	}
+	return f;
+}
+
+static struct ast_frame *__ast_read(struct ast_channel *chan, int dropaudio)
+{
+	struct ast_frame *f;
+
+	ast_channel_lock(chan);
+
+	f = read_core(chan, dropaudio);
+
+	if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DEFER_FRAMES)) {
+		struct ast_frame *dup;
+		struct ast_frame *head;
+
+		/*
+		 * Whoever is deferring frames is really only doing the read to
+		 * collect deferred frames and doesn't care about the returned
+		 * frames except to detect a hangup event.
+		 */
+
+		head = AST_LIST_FIRST(ast_channel_deferred_readq(chan));
+		if (head
+			&& head->frametype == AST_FRAME_CONTROL
+			&& head->subclass.integer == AST_CONTROL_HANGUP) {
+			/* We have already seen a hangup so continue to report hangup. */
+			if (f) {
+				ast_frfree(f);
+				f = NULL;
+			}
+		} else if (!f || (f->frametype == AST_FRAME_CONTROL
+			&& f->subclass.integer == AST_CONTROL_HANGUP)) {
+			static struct ast_frame hangup_frame = {
+				.frametype = AST_FRAME_CONTROL,
+				.subclass.integer = AST_CONTROL_HANGUP,
+			};
+
+			/*
+			 * Hangup is a special case.  Whoever is deferring frames
+			 * needs to know when the hangup happens so it can do what it
+			 * needs to do.  Then when frame deferral finishes the normal
+			 * frame processing will also need to get the hangup after
+			 * processing any deferred frames.
+			 *
+			 * Need to copy the hangup frame in case there is a hangup cause.
+			 */
+			dup = ast_frdup(f ?: &hangup_frame);
+			if (dup) {
+				AST_LIST_INSERT_HEAD(ast_channel_deferred_readq(chan), dup, frame_list);
+			}
+			if (f) {
+				ast_frfree(f);
+				f = NULL;
+			}
+		} else if (ast_is_deferrable_frame(f)) {
+			dup = ast_frdup(f);
+			if (dup) {
+				AST_LIST_INSERT_HEAD(ast_channel_deferred_readq(chan), dup, frame_list);
+			}
+			ast_frfree(f);
+			f = &ast_null_frame;
+		}
+	}
+
 	ast_channel_unlock(chan);
 	return f;
 }
@@ -6868,6 +6920,7 @@
 			}
 		}
 	}
+	/* Deferred readq's do not participate in masquerades */
 
 	/* Swap the raw formats */
 	tmp_format = ao2_bump(ast_channel_rawreadformat(original));
@@ -10254,6 +10307,36 @@
 	ast_queue_control_data(chan, AST_CONTROL_REDIRECTING, data, datalen);
 }
 
+/*!
+ * Storage to determine if the current thread is running an intercept dialplan routine.
+ */
+AST_THREADSTORAGE_RAW(in_intercept_routine);
+
+/*!
+ * \internal
+ * \brief Set the current intercept dialplan routine status mode.
+ * \since 13.14.0
+ *
+ * \param in_intercept_mode New intercept mode.  (Non-zero if in intercept mode)
+ *
+ * \return Nothing
+ */
+static void channel_set_intercept_mode(int in_intercept_mode)
+{
+	int status;
+
+	status = ast_threadstorage_set_ptr(&in_intercept_routine,
+		in_intercept_mode ? (void *) 1 : (void *) 0);
+	if (status) {
+		ast_log(LOG_ERROR, "Failed to set dialplan intercept mode\n");
+	}
+}
+
+int ast_channel_get_intercept_mode(void)
+{
+	return ast_threadstorage_get_ptr(&in_intercept_routine) ? 1 : 0;
+}
+
 int ast_channel_connected_line_macro(struct ast_channel *autoservice_chan, struct ast_channel *macro_chan, const void *connected_info, int is_caller, int is_frame)
 {
 	static int deprecation_warning = 0;
@@ -10287,14 +10370,11 @@
 
 		ast_party_connected_line_copy(ast_channel_connected(macro_chan), connected);
 	}
-	ast_channel_start_defer_frames(macro_chan, 0);
 	ast_channel_unlock(macro_chan);
 
+	channel_set_intercept_mode(1);
 	retval = ast_app_run_macro(autoservice_chan, macro_chan, macro, macro_args);
-
-	ast_channel_lock(macro_chan);
-	ast_channel_stop_defer_frames(macro_chan);
-	ast_channel_unlock(macro_chan);
+	channel_set_intercept_mode(0);
 
 	if (!retval) {
 		struct ast_party_connected_line saved_connected;
@@ -10343,14 +10423,11 @@
 
 		ast_party_redirecting_copy(ast_channel_redirecting(macro_chan), redirecting);
 	}
-	ast_channel_start_defer_frames(macro_chan, 0);
 	ast_channel_unlock(macro_chan);
 
+	channel_set_intercept_mode(1);
 	retval = ast_app_run_macro(autoservice_chan, macro_chan, macro, macro_args);
-
-	ast_channel_lock(macro_chan);
-	ast_channel_stop_defer_frames(macro_chan);
-	ast_channel_unlock(macro_chan);
+	channel_set_intercept_mode(0);
 
 	if (!retval) {
 		struct ast_party_redirecting saved_redirecting;
@@ -10392,14 +10469,11 @@
 
 		ast_party_connected_line_copy(ast_channel_connected(sub_chan), connected);
 	}
-	ast_channel_start_defer_frames(sub_chan, 0);
 	ast_channel_unlock(sub_chan);
 
+	channel_set_intercept_mode(1);
 	retval = ast_app_run_sub(autoservice_chan, sub_chan, sub, sub_args, 0);
-
-	ast_channel_lock(sub_chan);
-	ast_channel_stop_defer_frames(sub_chan);
-	ast_channel_unlock(sub_chan);
+	channel_set_intercept_mode(0);
 
 	if (!retval) {
 		struct ast_party_connected_line saved_connected;
@@ -10441,14 +10515,11 @@
 
 		ast_party_redirecting_copy(ast_channel_redirecting(sub_chan), redirecting);
 	}
-	ast_channel_start_defer_frames(sub_chan, 0);
 	ast_channel_unlock(sub_chan);
 
+	channel_set_intercept_mode(1);
 	retval = ast_app_run_sub(autoservice_chan, sub_chan, sub, sub_args, 0);
-
-	ast_channel_lock(sub_chan);
-	ast_channel_stop_defer_frames(sub_chan);
-	ast_channel_unlock(sub_chan);
+	channel_set_intercept_mode(0);
 
 	if (!retval) {
 		struct ast_party_redirecting saved_redirecting;
diff --git a/res/res_agi.c b/res/res_agi.c
index 969c62d..cd0d621 100644
--- a/res/res_agi.c
+++ b/res/res_agi.c
@@ -4075,7 +4075,7 @@
 			break;
 		}
 	} else if (c) {
-		ami_res = "Command Not Permitted on a dead channel";
+		ami_res = "Command Not Permitted on a dead channel or intercept routine";
 		resultcode = 511;
 
 		ast_agi_send(agi->fd, chan, "%d %s\n", resultcode, ami_res);
@@ -4111,6 +4111,8 @@
 	const char *sighup_str;
 	const char *exit_on_hangup_str;
 	int exit_on_hangup;
+	/*! Running in an interception routine is like DeadAGI mode.  No touchy the channel frames. */
+	int in_intercept = ast_channel_get_intercept_mode();
 
 	ast_channel_lock(chan);
 	sighup_str = pbx_builtin_getvar_helper(chan, "AGISIGHUP");
@@ -4145,7 +4147,7 @@
 			}
 		}
 		ms = -1;
-		if (dead) {
+		if (dead || in_intercept) {
 			c = ast_waitfor_nandfds(&chan, 0, &agi->ctrl, 1, NULL, &outfd, &ms);
 		} else if (!ast_check_hangup(chan)) {
 			c = ast_waitfor_nandfds(&chan, 1, &agi->ctrl, 1, NULL, &outfd, &ms);
@@ -4223,10 +4225,10 @@
 
 			if (agidebug)
 				ast_verbose("<%s>AGI Rx << %s\n", ast_channel_name(chan), buf);
-			cmd_status = agi_handle_command(chan, agi, buf, dead);
+			cmd_status = agi_handle_command(chan, agi, buf, dead || in_intercept);
 			switch (cmd_status) {
 			case AGI_RESULT_FAILURE:
-				if (dead || !ast_check_hangup(chan)) {
+				if (dead || in_intercept || !ast_check_hangup(chan)) {
 					/* The failure was not because of a hangup. */
 					returnstatus = AGI_RESULT_FAILURE;
 				}

-- 
To view, visit https://gerrit.asterisk.org/4771
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If8383a8b23ba9a335c138a9c0e79fca24b78343c
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>



More information about the asterisk-code-review mailing list