[Asterisk-code-review] res pjsip: Deny requests when threadpool queue is backed up. (asterisk[certified/13.1])

Mark Michelson asteriskteam at digium.com
Wed Nov 11 17:18:23 CST 2015


Mark Michelson has uploaded a new change for review.

  https://gerrit.asterisk.org/1608

Change subject: res_pjsip: Deny requests when threadpool queue is backed up.
......................................................................

res_pjsip: Deny requests when threadpool queue is backed up.

We have observed situations where the SIP threadpool may become
deadlocked. However, because incoming traffic is still arriving, the SIP
threadpool's queue can continue to grow, eventually running the system
out of memory.

This change makes it so that incoming traffic gets rejected with a 503
response if the queue is backed up too much.

Change-Id: I4e736d48a2ba79fd1f8056c0dcd330e38e6a3816
---
M include/asterisk/res_pjsip.h
M include/asterisk/taskprocessor.h
M include/asterisk/threadpool.h
M main/taskprocessor.c
M main/threadpool.c
M res/res_pjsip.c
M res/res_pjsip/pjsip_distributor.c
7 files changed, 44 insertions(+), 7 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/08/1608/1

diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index af11ea4..0ce5091 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2105,4 +2105,10 @@
  */
 const char *ast_sip_get_host_ip_string(int af);
 
+/*!
+ * \brief Return the size of the SIP threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_sip_threadpool_queue_size(void);
+
 #endif /* _RES_PJSIP_H */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f16f144..0636886 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -262,4 +262,10 @@
  */
 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
 
+/*!
+ * \brief Return the current size of the taskprocessor queue
+ * \since 13.7.0
+ */
+long ast_taskprocessor_size(struct ast_taskprocessor *tps);
+
 #endif /* __AST_TASKPROCESSOR_H__ */
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 1c67058..9a09d17 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -239,4 +239,10 @@
  */
 struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool);
 
+/*!
+ * \brief Return the size of the threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_threadpool_queue_size(struct ast_threadpool *pool);
+
 #endif /* ASTERISK_THREADPOOL_H */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 227fc3e..cdac2c8 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -130,9 +130,6 @@
 /*! \brief Remove the front task off the taskprocessor queue */
 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
 
-/*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
-
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
@@ -508,7 +505,7 @@
 	return task;
 }
 
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+long ast_taskprocessor_size(struct ast_taskprocessor *tps)
 {
 	return (tps) ? tps->tps_queue_size : -1;
 }
@@ -766,7 +763,7 @@
 {
 	struct ast_taskprocessor_local local;
 	struct tps_task *t;
-	int size;
+	long size;
 
 	ao2_lock(tps);
 	t = tps_taskprocessor_pop(tps);
@@ -798,7 +795,7 @@
 	 * after we pop an empty stack.
 	 */
 	tps->executing = 0;
-	size = tps_taskprocessor_depth(tps);
+	size = ast_taskprocessor_size(tps);
 	/* If we executed a task, bump the stats */
 	if (tps->stats) {
 		tps->stats->_tasks_processed_count++;
diff --git a/main/threadpool.c b/main/threadpool.c
index 6b412d2..f070654 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -1225,3 +1225,8 @@
 
 	return tps;
 }
+
+long ast_threadpool_queue_size(struct ast_threadpool *pool)
+{
+	return ast_taskprocessor_size(pool->tps);
+}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 0b541af..88edd71 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3408,6 +3408,10 @@
 	}
 }
 
+long ast_sip_threadpool_queue_size(void) {
+	return ast_threadpool_queue_size(sip_threadpool);
+}
+
 AST_TEST_DEFINE(xml_sanitization_end_null)
 {
 	char sanitized[8];
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 1294379..cb97d22 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -245,6 +245,8 @@
 	.on_rx_request = endpoint_lookup,
 };
 
+#define SIP_MAX_QUEUE 500l
+
 static pj_bool_t distributor(pjsip_rx_data *rdata)
 {
 	pjsip_dialog *dlg = find_dialog(rdata);
@@ -279,7 +281,18 @@
 		clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
 	}
 
-	ast_sip_push_task(serializer, distribute, clone);
+	if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
+		/* This may mean an allocation failure, but it's more likely that
+		 * conditions have become unfavorable, resulting in a full queue.
+		 * It's in our best interest to send back a 503 response and be
+		 * done with it.
+		 */
+		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
+		ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+		pjsip_rx_data_free_cloned(clone);
+	} else {
+		ast_sip_push_task(serializer, distribute, clone);
+	}
 
 end:
 	if (dlg) {

-- 
To view, visit https://gerrit.asterisk.org/1608
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e736d48a2ba79fd1f8056c0dcd330e38e6a3816
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: certified/13.1
Gerrit-Owner: Mark Michelson <mmichelson at digium.com>



More information about the asterisk-code-review mailing list