[asterisk-commits] branch group/sip-threading r26805 - /team/group/sip-threading/channels/

asterisk-commits at lists.digium.com asterisk-commits at lists.digium.com
Thu May 11 02:40:35 MST 2006


Author: file
Date: Thu May 11 04:40:35 2006
New Revision: 26805

URL: http://svn.digium.com/view/asterisk?rev=26805&view=rev
Log:
Next round of changes. Utilizes the scheduler more, moves MWI to a separate thread, gets rid of some locking, plus doesn't iterate through the entire pvt list every time (it potentially even went through multiple times)

Modified:
    team/group/sip-threading/channels/chan_sip.c

Modified: team/group/sip-threading/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/sip-threading/channels/chan_sip.c?rev=26805&r1=26804&r2=26805&view=diff
==============================================================================
--- team/group/sip-threading/channels/chan_sip.c (original)
+++ team/group/sip-threading/channels/chan_sip.c Thu May 11 04:40:35 2006
@@ -211,6 +211,11 @@
 #define RTP 	1
 #define NO_RTP	0
 
+/*! \brief SIP pvt state request change types (currently only SIP_STATE_DESTROY is available) */
+enum sippvtstates {
+	STATE_DESTROY,
+};
+
 /*! \brief Authorization scheme for call transfers 
 \note Not a bitfield flag, since there are plans for other modes,
 	like "only allow transfers for authenticated devices" */
@@ -487,17 +492,14 @@
 
 AST_MUTEX_DEFINE_STATIC(usecnt_lock);
 
-/*! \brief Protect the monitoring thread, so only one process can kill or start it, and not
-   when it's doing something critical. */
-AST_MUTEX_DEFINE_STATIC(netlock);
-
-AST_MUTEX_DEFINE_STATIC(monlock);
-
 AST_MUTEX_DEFINE_STATIC(sip_reload_lock);
 
 /*! \brief This is the thread for the monitor which checks for input on the channels
    which are not currently in use.  */
 static pthread_t monitor_thread = AST_PTHREADT_NULL;
+
+/*! \brief This is the thread used for MWI notifications - it polls the mailbox for peers if applicable */
+static pthread_t mwi_thread = AST_PTHREADT_NULL;
 
 static int sip_reloading = FALSE;			/*!< Flag for avoiding multiple reloads at the same time */
 static enum channelreloadreason sip_reloadreason;	/*!< Reason for last reload/load of configuration */
@@ -823,11 +825,14 @@
 	int rtptimeout;				/*!< RTP timeout time */
 	int rtpholdtimeout;			/*!< RTP timeout when on hold */
 	int rtpkeepalive;			/*!< Send RTP packets for keepalive */
+	int rtpschedid;                         /*!< Scheduled item to check on the RTP stream */
 	enum transfermodes allowtransfer;	/*! SIP Refer restriction scheme */
 	enum subscriptiontype subscribed;	/*!< SUBSCRIBE: Is this dialog a subscription?  */
 	int stateid;				/*!< SUBSCRIBE: ID for devicestate subscriptions */
 	int laststate;				/*!< SUBSCRIBE: Last known extension state */
 	int dialogver;				/*!< SUBSCRIBE: Version for subscription dialog-info */
+
+	int destroyschedid;                       /*!< Destruction scheduler ID */
 	
 	struct sip_refer *refer;		/*!< REFER: SIP transfer data structure */
 	struct ast_dsp *vad;			/*!< Voice Activation Detection dsp */
@@ -1189,6 +1194,7 @@
 static const char *gettag(const struct sip_request *req, char *header, char *tagbuf, int tagbufsize);
 static int find_sip_method(const char *msg);
 static unsigned int parse_sip_options(struct sip_pvt *pvt, const char *supported);
+static void __sip_destroy(struct sip_pvt *p, int lockowner);
 static void parse_request(struct sip_request *req);
 static const char *get_header(const struct sip_request *req, const char *name);
 static void copy_request(struct sip_request *dst,struct sip_request *src);
@@ -1425,6 +1431,45 @@
 	return sip_debug_test_addr(sip_real_dst(p));
 }
 
+/*! \brief Scheduled item that checks to see if a SIP channel can be destroyed */
+static int __sip_check_destroy(void *data)
+{
+	struct sip_pvt *p = data;
+
+	ast_mutex_lock(&p->lock);
+	
+	p->destroyschedid = -1;
+
+	/* If the flag is not set... do not schedule this again */
+	if (!ast_test_flag(&p->flags[0], SIP_NEEDDESTROY)) {
+		ast_mutex_unlock(&p->lock);
+		return 0;
+	}
+
+	/* Check to make sure we CAN be destroyed */
+	if (AST_LIST_EMPTY(&p->packets) && !p->owner) {
+		ast_mutex_unlock(&p->lock);
+		__sip_destroy(p, 1);
+	} else {
+		/* Can't be destroyed and still need to be destroyed... reschedule */
+		p->destroyschedid = ast_sched_add(sched, 1000, __sip_check_destroy, p);
+		ast_mutex_unlock(&p->lock);
+	}
+
+	return 0;
+}
+
+/*! \brief Request that a SIP Pvt structure needs to be destroyed
+ * It is assumed you hold the lock if you call this */
+static void sip_need_destroy(struct sip_pvt *p)
+{
+	/* Set the flag to indicate this */
+	ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+	/* now finally schedule it */
+	p->destroyschedid = ast_sched_add(sched, 1000, __sip_check_destroy, p);
+	return;
+}
+
 /*! \brief Transmit SIP message */
 static int __sip_xmit(struct sip_pvt *p, char *data, int len)
 {
@@ -1604,7 +1649,7 @@
 			ast_channel_unlock(pkt->owner->owner);
 		} else {
 			/* If no channel owner, destroy now */
-			ast_set_flag(&pkt->owner->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(pkt->owner);
 		}
 	}
 
@@ -2174,6 +2219,63 @@
 	if (!u && realtime)
 		u = realtime_user(name);
 	return u;
+}
+
+/*! \brief Monitor the RTP status of a SIP dialog (timeouts/keepalives) */
+static int __sip_rtp_monitor(void *data)
+{
+	int reschedule = 1;
+	struct sip_pvt *p = data;
+	time_t t;
+
+	/* Get current time */
+	time(&t);
+
+	ast_mutex_lock(&p->lock);
+
+	/* Remove this scheduled item from pvt structure */
+	p->rtpschedid = -1;
+
+	/* Do actual check */
+	if (p->rtp && p->owner && (p->owner->_state == AST_STATE_UP) && !p->redirip.sin_addr.s_addr) {
+		if (p->lastrtptx && p->rtpkeepalive && t > p->lastrtptx + p->rtpkeepalive) {
+			/* Need to send an empty RTP packet */
+			time(&p->lastrtptx);
+			ast_rtp_sendcng(p->rtp, 0);
+		}
+		if (p->lastrtprx && (p->rtptimeout || p->rtpholdtimeout) && t > p->lastrtprx + p->rtptimeout) {
+			/* Might be a timeout now -- see if we're on hold */
+			struct sockaddr_in sin;
+			ast_rtp_get_peer(p->rtp, &sin);
+			if (sin.sin_addr.s_addr ||
+			    (p->rtpholdtimeout &&
+			     (t > p->lastrtprx + p->rtpholdtimeout))) {
+				/* Needs a hangup */
+				if (p->rtptimeout) {
+					while (p->owner && ast_channel_trylock(p->owner)) {
+						ast_mutex_unlock(&p->lock);
+						usleep(1);
+						ast_mutex_lock(&p->lock);
+					}
+					if (p->owner) {
+						ast_log(LOG_NOTICE, "Disconnecting call '%s' for lack of RTP activity in %ld seconds\n", p->owner->name, (long)(t - p->lastrtprx));
+						/* Issue a softhangup */
+						ast_softhangup(p->owner, AST_SOFTHANGUP_DEV);
+						ast_channel_unlock(p->owner);
+						reschedule = 0;
+					}
+				}
+			}
+		}
+	}
+
+	/* If we need to reschedule then do it */
+	if (reschedule)
+		p->rtpschedid = ast_sched_add(sched, 1000, __sip_rtp_monitor, p);
+
+	ast_mutex_unlock(&p->lock);
+
+	return 0;
 }
 
 /*! \brief Create address structure from peer reference */
@@ -2465,6 +2567,10 @@
 		ast_sched_del(sched, p->initid);
 	if (p->autokillid > -1)
 		ast_sched_del(sched, p->autokillid);
+	if (p->rtpschedid > -1)
+		ast_sched_del(sched, p->rtpschedid);
+	if (p->destroyschedid > -1)
+		ast_sched_del(sched, p->destroyschedid);
 
 	if (p->rtp)
 		ast_rtp_destroy(p->rtp);
@@ -2869,7 +2975,8 @@
 			}
 		}
 	}
-	ast_copy_flags(&p->flags[0], &locflags, SIP_NEEDDESTROY);	
+	ast_copy_flags(&p->flags[0], &locflags, SIP_NEEDDESTROY);
+	sip_need_destroy(p);
 	ast_mutex_unlock(&p->lock);
 	return 0;
 }
@@ -3480,6 +3587,8 @@
 	p->method = intended_method;
 	p->initid = -1;
 	p->autokillid = -1;
+	p->rtpschedid = -1;
+	p->destroyschedid = -1;
 	p->subscribed = NONE;
 	p->stateid = -1;
 	p->prefs = default_prefs;		/* Set default codecs for this call */
@@ -5735,7 +5844,7 @@
 		if (p->registry)
 			ASTOBJ_UNREF(p->registry, sip_registry_destroy);
 		r->call = NULL;
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		/* Pretend to ACK anything just in case */
 		__sip_pretend_ack(p); /* XXX we need p locked, not sure we have */
 	}
@@ -7726,14 +7835,14 @@
 
 	if (strcmp(content_type, "text/plain")) { /* No text/plain attachment */
 		transmit_response(p, "415 Unsupported Media Type", req); /* Good enough, or? */
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+		sip_need_destroy(p);
 		return;
 	}
 
 	if (get_msg_text(buf, sizeof(buf), req)) {
 		ast_log(LOG_WARNING, "Unable to retrieve text from %s\n", p->callid);
 		transmit_response(p, "202 Accepted", req);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+		sip_need_destroy(p);
 		return;
 	}
 
@@ -7752,7 +7861,7 @@
 		ast_log(LOG_WARNING,"Received message to %s from %s, dropped it...\n  Content-Type:%s\n  Message: %s\n", get_header(req,"To"), get_header(req,"From"), content_type, buf);
 		transmit_response(p, "405 Method Not Allowed", req); /* Good enough, or? */
 	}
-	ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+	sip_need_destroy(p);
 	return;
 }
 
@@ -9144,7 +9253,7 @@
 	
 		if (!p->owner) {	/* not a PBX call */
 			transmit_response(p, "481 Call leg/transaction does not exist", req);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+			sip_need_destroy(p);
 			return;
 		}
 
@@ -9944,7 +10053,7 @@
 	/* Go ahead and send bye at this point */
 	if (ast_test_flag(&p->flags[0], SIP_PENDINGBYE)) {
 		transmit_request_with_auth(p, SIP_BYE, 0, XMIT_RELIABLE, TRUE);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		ast_clear_flag(&p->flags[0], SIP_NEEDREINVITE);	
 	} else if (ast_test_flag(&p->flags[0], SIP_NEEDREINVITE)) {
 		if (option_debug)
@@ -10072,7 +10181,7 @@
 			char *authorization = (resp == 401 ? "Authorization" : "Proxy-Authorization");
 			if ((p->authtries == MAX_AUTHTRIES) || do_proxy_auth(p, req, authenticate, authorization, SIP_INVITE, 1)) {
 				ast_log(LOG_NOTICE, "Failed to authenticate on INVITE to '%s'\n", get_header(&p->initreq, "From"));
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 				ast_set_flag(&p->flags[0], SIP_ALREADYGONE);	
 				if (p->owner)
 					ast_queue_control(p->owner, AST_CONTROL_CONGESTION);
@@ -10085,7 +10194,7 @@
 		ast_log(LOG_WARNING, "Received response: \"Forbidden\" from '%s'\n", get_header(&p->initreq, "From"));
 		if (!ast_test_flag(req, SIP_PKT_IGNORE) && p->owner)
 			ast_queue_control(p->owner, AST_CONTROL_CONGESTION);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		ast_set_flag(&p->flags[0], SIP_ALREADYGONE);	
 		break;
 	case 404: /* Not found */
@@ -10136,7 +10245,7 @@
 		if (ast_strlen_zero(p->authname)) {
 			ast_log(LOG_WARNING, "Asked to authenticate REFER to %s:%d but we have no matching peer or realm auth!\n",
 				ast_inet_ntoa(iabuf, sizeof(iabuf), p->recv.sin_addr), ntohs(p->recv.sin_port));
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+			sip_need_destroy(p);
 		}
 		if (resp == 401) {
 			auth = "WWW-Authenticate";
@@ -10144,7 +10253,7 @@
 		}
 		if ((p->authtries > 1) || do_proxy_auth(p, req, auth, auth2, SIP_REFER, 0)) {
 			ast_log(LOG_NOTICE, "Failed to authenticate on REFER to '%s'\n", get_header(&p->initreq, "From"));
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+			sip_need_destroy(p);
 		}
 		break;
 
@@ -10154,11 +10263,11 @@
 		/* Return to the current call onhold */
 		/* Status flag needed to be reset */
 		ast_log(LOG_NOTICE, "SIP transfer failed, call miserably fails. \n");
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+		sip_need_destroy(p);
 		break;
 	case 603:   /* Transfer declined */
 		ast_log(LOG_NOTICE, "SIP transfer declined, call fails. \n" );
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+		sip_need_destroy(p);
 		break;
 	}
 }
@@ -10174,42 +10283,42 @@
 	case 401:	/* Unauthorized */
 		if ((p->authtries == MAX_AUTHTRIES) || do_register_auth(p, req, "WWW-Authenticate", "Authorization")) {
 			ast_log(LOG_NOTICE, "Failed to authenticate on REGISTER to '%s@%s' (Tries %d)\n", p->registry->username, p->registry->hostname, p->authtries);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
-			}
+			sip_need_destroy(p);
+		}
 		break;
 	case 403:	/* Forbidden */
 		ast_log(LOG_WARNING, "Forbidden - wrong password on authentication for REGISTER for '%s' to '%s'\n", p->registry->username, p->registry->hostname);
 		if (global_regattempts_max)
 			p->registry->regattempts = global_regattempts_max+1;
 		ast_sched_del(sched, r->timeout);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		break;
 	case 404:	/* Not found */
 		ast_log(LOG_WARNING, "Got 404 Not found on SIP register to service %s@%s, giving up\n", p->registry->username,p->registry->hostname);
 		if (global_regattempts_max)
 			p->registry->regattempts = global_regattempts_max+1;
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		r->call = NULL;
 		ast_sched_del(sched, r->timeout);
 		break;
 	case 407:	/* Proxy auth */
 		if ((p->authtries == MAX_AUTHTRIES) || do_register_auth(p, req, "Proxy-Authenticate", "Proxy-Authorization")) {
 			ast_log(LOG_NOTICE, "Failed to authenticate on REGISTER to '%s' (tries '%d')\n", get_header(&p->initreq, "From"), p->authtries);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 		}
 		break;
 	case 479:	/* SER: Not able to process the URI - address is wrong in register*/
 		ast_log(LOG_WARNING, "Got error 479 on register to %s@%s, giving up (check config)\n", p->registry->username,p->registry->hostname);
 		if (global_regattempts_max)
 			p->registry->regattempts = global_regattempts_max+1;
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		r->call = NULL;
 		ast_sched_del(sched, r->timeout);
 		break;
 	case 200:	/* 200 OK */
 		if (!r) {
 			ast_log(LOG_WARNING, "Got 200 OK on REGISTER that isn't a register\n");
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 			return 0;
 		}
 
@@ -10323,7 +10432,7 @@
 
 		if (peer->pokeexpire > -1)
 			ast_sched_del(sched, peer->pokeexpire);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 
 		/* Try again eventually */
 		if ((peer->lastms < 0)  || (peer->lastms > peer->maxms))
@@ -10392,7 +10501,7 @@
 			p->authtries = 0;	/* Reset authentication counter */
 			if (sipmethod == SIP_MESSAGE) {
 				/* We successfully transmitted a message */
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			} else if (sipmethod == SIP_INVITE) {
 				handle_response_invite(p, resp, rest, req, seqno);
 			} else if (sipmethod == SIP_NOTIFY) {
@@ -10402,7 +10511,7 @@
 					ast_queue_hangup(p->owner);
 				} else {
 					if (p->subscribed == NONE) {
-						ast_set_flag(&p->flags[0], SIP_NEEDDESTROY); 
+						sip_need_destroy(p);
 					}
 				}
 			} else if (sipmethod == SIP_REGISTER)
@@ -10421,7 +10530,7 @@
 				res = handle_response_register(p, resp, rest, req, ignore, seqno);
 			else {
 				ast_log(LOG_WARNING, "Got authentication request (401) on unknown %s to '%s'\n", sip_methods[sipmethod].text, get_header(req, "To"));
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			}
 			break;
 		case 403: /* Forbidden - we failed authentication */
@@ -10431,7 +10540,7 @@
 				res = handle_response_register(p, resp, rest, req, ignore, seqno);
 			else {
 				ast_log(LOG_WARNING, "Forbidden - maybe wrong password on authentication for %s\n", msg);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			}
 			break;
 		case 404: /* Not found */
@@ -10450,16 +10559,17 @@
 			else if (p->registry && sipmethod == SIP_REGISTER)
 				res = handle_response_register(p, resp, rest, req, ignore, seqno);
 			else if (sipmethod == SIP_BYE) {
-				if (ast_strlen_zero(p->authname))
+				if (ast_strlen_zero(p->authname)) {
 					ast_log(LOG_WARNING, "Asked to authenticate %s, to %s:%d but we have no matching peer!\n",
-							msg, ast_inet_ntoa(iabuf, sizeof(iabuf), p->recv.sin_addr), ntohs(p->recv.sin_port));
-					ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+						msg, ast_inet_ntoa(iabuf, sizeof(iabuf), p->recv.sin_addr), ntohs(p->recv.sin_port));
+					sip_need_destroy(p);
+				}
 				if ((p->authtries == MAX_AUTHTRIES) || do_proxy_auth(p, req, "Proxy-Authenticate", "Proxy-Authorization", sipmethod, 0)) {
 					ast_log(LOG_NOTICE, "Failed to authenticate on %s to '%s'\n", msg, get_header(&p->initreq, "From"));
-					ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+					sip_need_destroy(p);
 				}
 			} else	/* We can't handle this, giving up in a bad way */
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 
 			break;
 		case 481: /* Call leg does not exist */
@@ -10477,7 +10587,7 @@
 				ast_log(LOG_WARNING, "Remote host can't match request %s to call '%s'. Giving up.\n", sip_methods[sipmethod].text, p->callid);
 				if (owner)
 					ast_queue_control(p->owner, AST_CONTROL_CONGESTION);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+				sip_need_destroy(p);
 			} else if (sipmethod == SIP_BYE) {
 				/* The other side has no transaction to bye,
 				just assume it's all right then */
@@ -10495,8 +10605,8 @@
 			if (sipmethod == SIP_INVITE)
 				handle_response_invite(p, resp, rest, req, seqno);
 			else {
-				ast_log(LOG_DEBUG, "Got 491 on %s, unspported. Call ID %s\n", sip_methods[sipmethod].text, p->callid);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				ast_log(LOG_DEBUG, "Got 491 on %s, unsupported. Call ID %s\n", sip_methods[sipmethod].text, p->callid);
+				sip_need_destroy(p);
 			}
 			break;
 		case 501: /* Not Implemented */
@@ -10581,7 +10691,7 @@
 					transmit_request(p, SIP_ACK, seqno, XMIT_UNRELIABLE, FALSE);
 				ast_set_flag(&p->flags[0], SIP_ALREADYGONE);	
 				if (!p->owner)
-					ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+					sip_need_destroy(p);
 			} else if ((resp >= 100) && (resp < 200)) {
 				if (sipmethod == SIP_INVITE) {
 					sip_cancel_destroy(p);
@@ -10619,7 +10729,7 @@
 				/* Wait for 487, then destroy */
 			} else if (sipmethod == SIP_MESSAGE)
 				/* We successfully transmitted a message */
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			break;
 		case 202:   /* Transfer accepted */
 			if (sipmethod == SIP_REFER) 
@@ -10638,7 +10748,7 @@
 				auth2 = (resp == 407 ? "Proxy-Authorization" : "Authorization");
 				if ((p->authtries == MAX_AUTHTRIES) || do_proxy_auth(p, req, auth, auth2, sipmethod, 0)) {
 					ast_log(LOG_NOTICE, "Failed to authenticate on %s to '%s'\n", msg, get_header(&p->initreq, "From"));
-					ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+					sip_need_destroy(p);
 				}
 			}
 			break;
@@ -10941,7 +11051,7 @@
 		/* Here's room to implement incoming voicemail notifications :-) */
 		transmit_response(p, "489 Bad event", req);
 		if (!p->lastinvite) 
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 		return -1;
 	} else {
 		/* Handle REFER notifications */
@@ -10961,7 +11071,7 @@
 		if (strncasecmp(get_header(req, "Content-Type"), "message/sipfrag", strlen("message/sipfrag"))) {
 			/* We need a sipfrag */
 			transmit_response(p, "400 Bad request", req);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 			return -1;
 		}
 
@@ -10969,7 +11079,7 @@
 		if (get_msg_text(buf, sizeof(buf), req)) {
 			ast_log(LOG_WARNING, "Unable to retrieve attachment from NOTIFY %s\n", p->callid);
 			transmit_response(p, "400 Bad request", req);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 			return -1;
 		}
 
@@ -11053,7 +11163,7 @@
 	/* THis could be voicemail notification */
 	transmit_response(p, "200 OK", req);
 	if (!p->lastinvite) 
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 	return res;
 }
 
@@ -11074,7 +11184,7 @@
 	/* Destroy if this OPTIONS was the opening request, but not if
 	   it's in the middle of a normal call flow. */
 	if (!p->lastinvite)
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 
 	return res;
 }
@@ -11104,7 +11214,7 @@
 			transmit_response_with_unsupported(p, "420 Bad extension", req, required);
 			ast_log(LOG_WARNING,"Received SIP INVITE with unsupported required extension: %s\n", required);
 			if (!p->lastinvite)
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			return -1;
 			
 		}
@@ -11150,7 +11260,7 @@
 				if (process_sdp(p, req)) {
 					transmit_response(p, "488 Not acceptable here", req);
 					if (!p->lastinvite)
-						ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+						sip_need_destroy(p);
 					return -1;
 				}
 			} else {
@@ -11171,7 +11281,7 @@
 		if (res < 0) { /* Something failed in authentication */
 			ast_log(LOG_NOTICE, "Failed to authenticate user %s\n", get_header(req, "From"));
 			transmit_response_reliable(p, "403 Forbidden", req);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+			sip_need_destroy(p);
 			ast_string_field_free(p, theirtag);
 			return 0;
 		}
@@ -11181,7 +11291,7 @@
 			if (process_sdp(p, req)) {
 				/* Unacceptable codecs */
 				transmit_response_reliable(p, "488 Not acceptable here", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 				if (option_debug)
 					ast_log(LOG_DEBUG, "No compatible codecs for this SIP call.\n");
 				return -1;
@@ -11207,7 +11317,7 @@
 			if (res < 0) {
 				ast_log(LOG_NOTICE, "Failed to place call for user %s, too many calls\n", p->username);
 				transmit_response_reliable(p, "480 Temporarily Unavailable (Call limit) ", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			}
 			return 0;
 		}
@@ -11224,7 +11334,7 @@
 				transmit_response_reliable(p, "404 Not Found", req);
 				update_call_counter(p, DEC_CALL_LIMIT);
 			}
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);		
+			sip_need_destroy(p);
 		} else {
 			/* If no extension was specified, use the s one */
 			if (ast_strlen_zero(p->exten))
@@ -11333,14 +11443,14 @@
 					transmit_response(p, "488 Not Acceptable Here (codec error)", req);
 				else
 					transmit_response_reliable(p, "488 Not Acceptable Here (codec error)", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			} else {
 				ast_log(LOG_NOTICE, "Unable to create/find SIP channel for this INVITE\n");
 				if (ast_test_flag(req, SIP_PKT_IGNORE))
 					transmit_response(p, "503 Unavailable", req);
 				else
 					transmit_response_reliable(p, "503 Unavailable", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 			}
 		}
 	}
@@ -11374,8 +11484,8 @@
 		transmit_response(p, "603 Declined (No dialog)", req);
 		if (!ast_test_flag(req, SIP_PKT_IGNORE)) {
 			append_history(p, "Xfer", "Refer failed. Outside of dialog.");
-			ast_set_flag(&p->flags[0], SIP_ALREADYGONE);	
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			ast_set_flag(&p->flags[0], SIP_ALREADYGONE);
+			sip_need_destroy(p);
 		}
 		return 0;
 	}	
@@ -11466,7 +11576,7 @@
 	if (p->owner)
 		ast_queue_hangup(p->owner);
 	else
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 	if (p->initreq.len > 0) {
 		transmit_response_reliable(p, "487 Request Terminated", &p->initreq);
 		transmit_response(p, "200 OK", req);
@@ -11524,7 +11634,7 @@
 	} else if (p->owner)
 		ast_queue_hangup(p->owner);
 	else
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 	transmit_response(p, "200 OK", req);
 
 	return 1;
@@ -11575,7 +11685,7 @@
 	*/
 	if (!global_allowsubscribe) {
  		transmit_response(p, "403 Forbidden (policy)", req);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		return 0;
 	}
 
@@ -11600,7 +11710,7 @@
 	if (res) {
 		if (res < 0) {
 			ast_log(LOG_NOTICE, "Failed to authenticate user %s for SUBSCRIBE\n", get_header(req, "From"));
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 		}
 		return 0;
 	}
@@ -11608,7 +11718,7 @@
 	/* Check if this user/peer is allowed to subscribe at all */
 	if (!ast_test_flag(&p->flags[1], SIP_PAGE2_ALLOWSUBSCRIBE)) {
 		transmit_response(p, "403 Forbidden (policy)", req);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+		sip_need_destroy(p);
 		return 0;
 	}
 
@@ -11623,7 +11733,7 @@
 	build_contact(p);
 	if (gotdest) {
 		transmit_response(p, "404 Not Found", req);
-		ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+		sip_need_destroy(p);
 		return 0;
 	} else {
 		/* Initialize tag for new subscriptions */	
@@ -11647,7 +11757,7 @@
 			} else {
  				/* Can't find a format for events that we know about */
  				transmit_response(p, "489 Bad Event", req);
- 				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
  				return 0;
  			}
  		} else if (!strcmp(event, "message-summary")) { 
@@ -11656,7 +11766,7 @@
 				transmit_response(p, "406 Not Acceptable", req);
 				if (option_debug > 1)
 					ast_log(LOG_DEBUG, "Received SIP mailbox subscription for unknown format: %s\n", accept);
- 				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 				return 0;
 			}
 			/* Looks like they actually want a mailbox status 
@@ -11666,7 +11776,7 @@
 			*/
 			if (!authpeer || ast_strlen_zero(authpeer->mailbox)) {
 				transmit_response(p, "404 Not found (no mailbox)", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 				ast_log(LOG_NOTICE, "Received SIP subscribe for peer without mailbox: %s\n", authpeer->name);
 				return 0;
 			}
@@ -11681,7 +11791,7 @@
 			transmit_response(p, "489 Bad Event", req);
 			if (option_debug > 1)
 				ast_log(LOG_DEBUG, "Received SIP subscribe for unknown event package: %s\n", event);
- 			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 			return 0;
 		}
 		if (p->subscribed != MWI_NOTIFICATION && !resubscribe)
@@ -11721,7 +11831,7 @@
 			if ((firststate = ast_extension_state(NULL, p->context, p->exten)) < 0) {
 				ast_log(LOG_ERROR, "Got SUBSCRIBE for extension without hint. Please add hint to %s in context %s\n", p->exten, p->context);
 				transmit_response(p, "404 Not found", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+				sip_need_destroy(p);
 				return 0;
 			} else {
 				struct sip_pvt *p_old;
@@ -11749,7 +11859,7 @@
 					if (!strcmp(p_old->username, p->username)) {
 						if (!strcmp(p_old->exten, p->exten) &&
 						    !strcmp(p_old->context, p->context)) {
-							ast_set_flag(&p_old->flags[0], SIP_NEEDDESTROY);
+							sip_need_destroy(p_old);
 							ast_mutex_unlock(&p_old->lock);
 							break;
 						}
@@ -11760,7 +11870,7 @@
 			}
 		}
 		if (!p->expiry)
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+			sip_need_destroy(p);
 	}
 	if (authpeer)
 		ASTOBJ_UNREF(authpeer, sip_destroy_peer);
@@ -11827,7 +11937,7 @@
 	}
 	if (error) {
 		if (!p->initreq.header)	/* New call */
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	/* Make sure we destroy this dialog */
+			sip_need_destroy(p);
 		return -1;
 	}
 	/* Get the command XXX */
@@ -11846,7 +11956,7 @@
 		if (!p->initreq.headers) {
 			if (option_debug)
 				ast_log(LOG_DEBUG, "That's odd...  Got a response on a call we dont know about. Cseq %d Cmd %s\n", seqno, cmd);
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 			return 0;
 		} else if (p->ocseq && (p->ocseq < seqno)) {
 			if (option_debug)
@@ -11928,7 +12038,7 @@
 				/* Will cease to exist after ACK */
 			} else if (req->method != SIP_ACK) {
 				transmit_response(p, "481 Call/Transaction Does Not Exist", req);
-				ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);
+				sip_need_destroy(p);
 			}
 			return res;
 		}
@@ -11983,7 +12093,7 @@
 			check_pendings(p);
 		}
 		if (!p->lastinvite && ast_strlen_zero(p->randdata))
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 		break;
 	default:
 		transmit_response_with_allow(p, "501 Method Not Implemented", req, 0);
@@ -11991,7 +12101,7 @@
 			cmd, ast_inet_ntoa(iabuf, sizeof(iabuf), p->sa.sin_addr));
 		/* If this is some new method, and we don't have a call, destroy it now */
 		if (!p->initreq.headers)
-			ast_set_flag(&p->flags[0], SIP_NEEDDESTROY);	
+			sip_need_destroy(p);
 		break;
 	}
 	return res;
@@ -12216,6 +12326,27 @@
 	return FALSE;
 }
 
+/*! \brief MWI Notification Thread */
+static void *do_mwi_monitor(void *data)
+{
+
+	for (;;) {
+		/* See if we are being cancelled */
+		pthread_testcancel();
+		/* Traverse the list looking for peers that need MWI to be sent */
+		ASTOBJ_CONTAINER_TRAVERSE(&peerl, 1, do {
+						  ASTOBJ_WRLOCK(iterator);
+						  if (does_peer_need_mwi(iterator))
+							  sip_send_mwi_to_peer(iterator);
+						  ASTOBJ_UNLOCK(iterator);
+					  } while (0)
+			);
+		/* Sleep for a wee bit until we do this again */
+		usleep(1000);
+	}
+
+	return NULL;
+}
 
 /*! \brief The SIP monitoring thread 
 \note	This thread monitors all the SIP sessions and peers that needs notification of mwi
@@ -12224,12 +12355,6 @@
 static void *do_monitor(void *data)
 {
 	int res;
-	struct sip_pvt *sip = NULL;
-	struct sip_peer *peer = NULL;
-	time_t t;
-	int fastrestart = FALSE;
-	int lastpeernum = -1;
-	int curpeernum;
 	int reloading;
 
 	/* Add an I/O event to our SIP UDP socket */
@@ -12243,100 +12368,26 @@
 		reloading = sip_reloading;
 		sip_reloading = FALSE;
 		ast_mutex_unlock(&sip_reload_lock);
-		if (reloading) {
+		if (reloading == TRUE) {
 			if (option_verbose > 0)
 				ast_verbose(VERBOSE_PREFIX_1 "Reloading SIP\n");
 			sip_do_reload(sip_reloadreason);
-		}
-		/* Check for interfaces needing to be killed */
-		AST_LIST_LOCK(&iflist);
-restartsearch:		
-		time(&t);
-		AST_LIST_TRAVERSE(&iflist, sip, entry) {
-			ast_mutex_lock(&sip->lock);
-			/* Check RTP timeouts and kill calls if we have a timeout set and do not get RTP */
-			if (sip->rtp && sip->owner && (sip->owner->_state == AST_STATE_UP) && !sip->redirip.sin_addr.s_addr) {
-				if (sip->lastrtptx && sip->rtpkeepalive && t > sip->lastrtptx + sip->rtpkeepalive) {
-					/* Need to send an empty RTP packet */
-					time(&sip->lastrtptx);
-					ast_rtp_sendcng(sip->rtp, 0);
-				}
-				if (sip->lastrtprx && (sip->rtptimeout || sip->rtpholdtimeout) && t > sip->lastrtprx + sip->rtptimeout) {
-					/* Might be a timeout now -- see if we're on hold */
-					struct sockaddr_in sin;
-					ast_rtp_get_peer(sip->rtp, &sin);
-					if (sin.sin_addr.s_addr || 
-							(sip->rtpholdtimeout && 
-							  (t > sip->lastrtprx + sip->rtpholdtimeout))) {
-						/* Needs a hangup */
-						if (sip->rtptimeout) {
-							while(sip->owner && ast_channel_trylock(sip->owner)) {
-								ast_mutex_unlock(&sip->lock);
-								usleep(1);
-								ast_mutex_lock(&sip->lock);
-							}
-							if (sip->owner) {
-								ast_log(LOG_NOTICE, "Disconnecting call '%s' for lack of RTP activity in %ld seconds\n", sip->owner->name, (long)(t - sip->lastrtprx));
-								/* Issue a softhangup */
-								ast_softhangup(sip->owner, AST_SOFTHANGUP_DEV);
-								ast_channel_unlock(sip->owner);
-							}
-						}
-					}
-				}
-			}
-			/* If we have sessions that needs to be destroyed, do it now */
-			if (ast_test_flag(&sip->flags[0], SIP_NEEDDESTROY) && AST_LIST_EMPTY(&sip->packets) && !sip->owner) {
-				ast_mutex_unlock(&sip->lock);
-				__sip_destroy(sip, 1);
-				goto restartsearch;
-			}
-			ast_mutex_unlock(&sip->lock);
-		}
-		AST_LIST_UNLOCK(&iflist);
-
-		pthread_testcancel();
+		} else if (reloading == 3) {
+			/* Our thread is being terminated, so goodbye */
+			break;
+		}
 		/* Wait for sched or io */
 		res = ast_sched_wait(sched);
 		if ((res < 0) || (res > 1000))
 			res = 1000;
-		/* If we might need to send more mailboxes, don't wait long at all.*/
-		if (fastrestart)
-			res = 1;
 		res = ast_io_wait(io, res);
 		if (option_debug && res > 20)
 			ast_log(LOG_DEBUG, "chan_sip: ast_io_wait ran %d all at once\n", res);
-		ast_mutex_lock(&monlock);
 		if (res >= 0)  {
 			res = ast_sched_runq(sched);
 			if (option_debug && res >= 20)
 				ast_log(LOG_DEBUG, "chan_sip: ast_sched_runq ran %d all at once\n", res);
 		}
-
-		/* Send MWI notifications to peers - static and cached realtime peers */
-		time(&t);
-		fastrestart = FALSE;
-		curpeernum = 0;
-		peer = NULL;
-		ASTOBJ_CONTAINER_TRAVERSE(&peerl, !peer, do {
-			if ((curpeernum > lastpeernum) && does_peer_need_mwi(iterator)) {
-				fastrestart = TRUE;
-				lastpeernum = curpeernum;
-				peer = ASTOBJ_REF(iterator);
-			};
-			curpeernum++;
-		} while (0)
-		);
-		if (peer) {
-			ASTOBJ_WRLOCK(peer);
-			sip_send_mwi_to_peer(peer);
-			ASTOBJ_UNLOCK(peer);
-			ASTOBJ_UNREF(peer,sip_destroy_peer);
-		} else {
-			/* Reset where we come from */
-			lastpeernum = -1;
-		}
-		ast_mutex_unlock(&monlock);
 	}
 	/* Never reached */
 	return NULL;
@@ -12346,27 +12397,16 @@
 /*! \brief Start the channel monitor thread */
 static int restart_monitor(void)
 {
-	/* If we're supposed to be stopped -- stay stopped */
-	if (monitor_thread == AST_PTHREADT_STOP)
-		return 0;
-	ast_mutex_lock(&monlock);
-	if (monitor_thread == pthread_self()) {
-		ast_mutex_unlock(&monlock);
-		ast_log(LOG_WARNING, "Cannot kill myself\n");
-		return -1;
-	}
 	if (monitor_thread != AST_PTHREADT_NULL) {
 		/* Wake up the thread */
 		pthread_kill(monitor_thread, SIGURG);
 	} else {
 		/* Start a new monitor */
 		if (ast_pthread_create(&monitor_thread, NULL, do_monitor, NULL) < 0) {
-			ast_mutex_unlock(&monlock);
 			ast_log(LOG_ERROR, "Unable to start monitor thread.\n");
 			return -1;
 		}
 	}
-	ast_mutex_unlock(&monlock);
 	return 0;
 }
 
@@ -12583,6 +12623,10 @@
 	build_via(p);
 	build_callid_pvt(p);
 	
+        /* If any RTP options are set, start up a scheduled item to periodically check them */
+        if (p->rtptimeout || p->rtpholdtimeout || p->rtpkeepalive)
+                p->rtpschedid = ast_sched_add(sched, 1000, __sip_rtp_monitor, p);
+
 	/* We have an extension to call, don't use the full contact here */
 	/* This to enable dialing registered peers with extension dialling,
 	   like SIP/peername/extension 	
@@ -13689,7 +13733,6 @@
 	if (!ntohs(bindaddr.sin_port))
 		bindaddr.sin_port = ntohs(DEFAULT_SIP_PORT);
 	bindaddr.sin_family = AF_INET;
-	ast_mutex_lock(&netlock);
 	if ((sipsock > -1) && (memcmp(&old_bindaddr, &bindaddr, sizeof(struct sockaddr_in)))) {
 		close(sipsock);
 		sipsock = -1;
@@ -13722,7 +13765,6 @@
 			}
 		}
 	}
-	ast_mutex_unlock(&netlock);
 
 	/* Add default domains - host name, IP address and IP:port */
 	/* Only do this if user added any sip domain with "localdomains" */
@@ -14202,6 +14244,11 @@
 
 	sip_poke_all_peers();	
 	sip_send_all_registers();
+
+	/* Start the MWI thread which is always running */
+	if (ast_pthread_create(&mwi_thread, NULL, do_mwi_monitor, NULL) < 0) {
+		ast_log(LOG_ERROR, "Failed to create MWI notification thread -- MWI will be unavailable\n");
+	}
 	
 	/* And start the monitor for the first time */
 	restart_monitor();
@@ -14232,6 +14279,12 @@
 	ast_manager_unregister("SIPpeers");
 	ast_manager_unregister("SIPshowpeer");
 
+	/* Stop MWI thread */
+	if (mwi_thread != AST_PTHREADT_NULL) {
+		pthread_cancel(mwi_thread);
+		pthread_kill(mwi_thread, SIGURG);
+		pthread_join(mwi_thread, NULL);
+	}
 
 	/* Stop idle and active threads */
 	AST_LIST_LOCK(&idle_list);
@@ -14269,27 +14322,17 @@
 	}
 	AST_LIST_UNLOCK(&iflist);
 
-	if (!ast_mutex_lock(&monlock)) {
-		if (monitor_thread && (monitor_thread != AST_PTHREADT_STOP)) {
-			pthread_cancel(monitor_thread);
-			pthread_kill(monitor_thread, SIGURG);
-			pthread_join(monitor_thread, NULL);
-		}
-		monitor_thread = AST_PTHREADT_STOP;
-		ast_mutex_unlock(&monlock);
-	} else {
-		ast_log(LOG_WARNING, "Unable to lock the monitor\n");
-		return -1;
-	}
-
-	ast_mutex_lock(&monlock);
-	if (monitor_thread && (monitor_thread != AST_PTHREADT_STOP)) {
-		pthread_cancel(monitor_thread);
+	/* Kill the main monitor thread */
+	if (monitor_thread != AST_PTHREADT_NULL) {
+		/* Signal this */
+		ast_mutex_lock(&sip_reload_lock);
+		sip_reloading = 3;
+		ast_mutex_unlock(&sip_reload_lock);
+		/* Wake them up if needed */
 		pthread_kill(monitor_thread, SIGURG);
+		/* Wait for it to be done */
 		pthread_join(monitor_thread, NULL);
 	}
-	monitor_thread = AST_PTHREADT_STOP;
-	ast_mutex_unlock(&monlock);
 
 	AST_LIST_LOCK(&iflist);
 	AST_LIST_TRAVERSE(&iflist, p, entry) {



More information about the asterisk-commits mailing list