[asterisk-commits] mjordan: branch mjordan/twisted_process r3099 - /asterisk/team/mjordan/twiste...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Mar 19 08:29:09 CDT 2012
Author: mjordan
Date: Mon Mar 19 08:29:03 2012
New Revision: 3099
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3099
Log:
Update subprocess to reactor with changes from weekend
Started SIPp migration, which is going to be tricky, since twisted
doesn't appear to like opening it.
Modified:
asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py?view=diff&rev=3099&r1=3098&r2=3099
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py Mon Mar 19 08:29:03 2012
@@ -131,6 +131,7 @@
).addErrback(self.ami_logoff)
def ami_test_done(self, ami, event):
+ logger.debug(str(event))
if event.get("event") == "Hangup":
self.check_active_channels()
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py?view=diff&rev=3099&r1=3098&r2=3099
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py Mon Mar 19 08:29:03 2012
@@ -22,8 +22,7 @@
from config import ConfigFile
from version import AsteriskVersion
-from twisted.internet import reactor, protocol, threads, defer
-from twisted.internet import utils
+from twisted.internet import reactor, protocol, defer, utils
logger = logging.getLogger(__name__)
@@ -33,11 +32,16 @@
"""
def __init__(self, host, cmd):
- """
- Create a new Asterisk CLI Protocol instance
-
+ """ Create a new Asterisk CLI Protocol instance
+
+ This class wraps an Asterisk instance that executes a CLI command
+ against another instance of Asterisk.
+
+ Keyword Arguments:
host The host this CLI instance will connect to
- cmd The command to spawn (Asterisk instance, with command line parameters)
+ cmd List of command arguments to spawn. The first argument must be
+ the location of the Asterisk executable; each subsequent argument should define
+ the CLI command to run and the instance of Asterisk to run it against.
"""
self.__host = host
self.__cmd = cmd
@@ -46,14 +50,13 @@
self.err = ""
def execute(self):
- """
- Execute the CLI command.
+ """ Execute the CLI command.
Returns a deferred that will be called when the operation completes. The
parameter to the deferred is this object.
"""
def __cli_output_callback(result):
- """ Handle regular output from the Asterisk CLI instance """
+ """ Callback from getProcessOutputAndValue """
self.__set_properties(result)
logger.debug("Asterisk CLI %s exited %d" % (self.__host, self.exitcode))
if self.exitcode:
@@ -62,9 +65,10 @@
self.__deferred.callback(self)
def __cli_error_callback(result):
- """ Handle an error coming back from the Asterisk CLI instance """
+ """ Errback from getProcessOutputAndValue """
self.__set_properties(result)
- logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
+ logger.warning("Asterisk CLI %s exited %d with error: %s"
+ % (self.__host, self.exitcode, self.err))
self.__deferred.errback(self)
self.__deferred = defer.Deferred()
@@ -207,8 +211,7 @@
""" Start this instance of Asterisk.
Returns:
- A deferred object that is called when Asterisk is fully booted, or an
- error if Asterisk fails to start
+ A deferred object that will be called when Asterisk is fully booted.
Example Usage:
asterisk.start()
@@ -218,9 +221,11 @@
"""
def __wait_fully_booted_callback(cli_command):
+ """ Callback for CLI command waitfullybooted """
self.__start_deferred.callback("Successfully started Asterisk %s" % self.host)
def __wait_fully_booted_error(cli_command):
+ """ Errback for CLI command waitfullybooted """
if time.time() - self.__start_asterisk_time > 5:
logger.error("Asterisk core waitfullybooted for %s failed" % self.host)
self.__start_deferred.errback("Command core waitfullybooted failed")
@@ -239,6 +244,10 @@
"-C", "%s" % os.path.join(self.astetcdir, "asterisk.conf")
]
+ # Make the start/stop deferreds - this method will return
+ # the start deferred, and pass the stop deferred to the AsteriskProtocol
+ # object. The stop deferred will be raised when the Asterisk process
+ # exits
self.__start_deferred = defer.Deferred()
self.__stop_deferred = defer.Deferred()
self.processProtocol = AsteriskProtocol(self.host, self.__stop_deferred)
@@ -257,67 +266,73 @@
This function is used to stop this instance of Asterisk.
Returns a deferred that can be used to detect when Asterisk exits,
- or if it fails to exit
+ or if it fails to exit.
Example Usage:
asterisk.stop()
"""
def __send_stop_gracefully():
+ """ Send a core stop gracefully CLI command """
if self.ast_version < AsteriskVersion("1.6.0"):
cli_deferred = self.cli_exec("stop gracefully")
else:
cli_deferred = self.cli_exec("core stop gracefully")
- return cli_deferred
+ cli_deferred.addCallback(__stop_gracefully_callback)
+ cli_deferred.addErrback(__stop_gracefully_error)
+
+ def __stop_gracefully_callback(cli_command):
+ """ Callback handler for the core stop gracefully CLI command """
+ logger.debug("Successfully stopped Asterisk %s" % self.host)
+ self.__stop_attempts = 0
+
+ def __stop_gracefully_error(cli_command):
+ """ Errback for the core stop gracefully CLI command """
+ if self.__stop_attempts > 5:
+ self.__stop_attempts = 0
+ logger.warning("Asterisk graceful stop for %s failed" % self.host)
+ else:
+ logger.debug("Asterisk graceful stop failed, attempting again...")
+ self.__stop_attemps += 1
+ __send_stop_gracefully()
def __send_stop_now():
+ """ Send a core stop now CLI command """
if self.ast_version < AsteriskVersion("1.6.0"):
cli_deferred = self.cli_exec("stop now")
else:
cli_deferred = self.cli_exec("core stop now")
- return cli_deferred
-
- def __stop_gracefully_callback(cli_command):
+ cli_deferred.addCallback(__stop_now_callback)
+ cli_deferred.addErrback(__stop_now_error)
+
+ def __stop_now_callback(cli_command):
+ """ Callback handler for the core stop now CLI command """
logger.debug("Successfully stopped Asterisk %s" % self.host)
-
- def __stop_gracefully_error(cli_command):
- if time.time() - self.__stop_time > 5:
- logger.warning("Asterisk graceful stop for %s failed, attempting stop now" % self.host)
- self.__stop_time = time.time()
+ self.__stop_attempts = 0
+
+ def __stop_now_error(cli_command):
+ """ Errback handler for the core stop now CLI command """
+ if self.__stop_attempts > 5:
+ self.__stop_attempts = 0
+ logger.warning("Asterisk graceful stop for %s failed" % self.host)
+ else:
+ logger.debug("Asterisk stop now failed, attempting again...")
+ self.__stop_attempts += 1
cli_deferred = __send_stop_now()
cli_deferred.addCallback(__stop_now_callback)
cli_deferred.addErrback(__stop_now_error)
- else:
- logger.debug("Asterisk graceful stop failed, attempting again...")
- cli_deferred = __send_stop_gracefully()
- cli_deferred.addCallback(__stop_gracefully_callback)
- cli_deferred.addErrback(__stop_gracefully_error)
-
- def __stop_now_callback(cli_command):
- logger.debug("Successfully stopped Asterisk %s" % self.host)
-
- def __stop_now_error(cli_command):
- if time.time() - self.__stop_time > 5:
- logger.warning("Asterisk stop now for %s failed, attempting TERM" % self.host)
- # If even a "stop now" didn't do the trick, fall back to sending
- # signals to the process. First, send a SIGTERM. If it _STILL_ hasn't
- # gone away after another 5 seconds, send SIGKILL.
- try:
- self.process.signalProcess("TERM")
- if not self.processProtocol.exited:
- reactor.callLater(5, __check_process_and_kill)
- except twisted.internet.error.ProcessExitedAlready:
- # Probably that we sent a signal to a process that was already
- # dead. Just ignore it.
- pass
- logger.debug("Successfully terminated Asterisk %s" % self.host)
- else:
- logger.debug("Asterisk stop now failed, attempting again...")
- cli_deferred = __send_stop_now()
- cli_deferred.addCallback(__stop_now_callback)
- cli_deferred.addErrback(__stop_now_error)
-
- def __check_process_and_kill():
+
+ def __send_term():
+ try:
+ logger.info("Sending TERM to Asterisk %s" % self.host)
+ self.process.signalProcess("TERM")
+ except twisted.internet.error.ProcessExitedAlready:
+ # Probably that we sent a signal to a process that was already
+ # dead. Just ignore it.
+ pass
+
+ def __send_kill():
+ """ Check to see if the process is running and kill it with fire """
try:
if not self.processProtocol.exited:
logger.info("Sending KILL to Asterisk %s" % self.host)
@@ -325,32 +340,36 @@
except twisted.internet.error.ProcessExitedAlready:
# Pass on this
pass
- # If you kill the process, the ProcessProtocol will never get the note that its dead
+ # If you kill the process, the ProcessProtocol will never get the note
+ # that its dead. Call the stop callback to notify everyone that we did
+ # indeed kill the Asterisk instance.
try:
+ # Attempt to signal the process object that it should lose its
+ # connection - it may already be gone however.
self.process.loseConnection()
except:
pass
- self.__stop_deferred.callback("Asterisk %s KILLED" % self.host)
-
- def __cancel_kill(reason):
- try:
- self.__cancel_token.cancel()
- except ValueError:
- pass
+ self.__stop_deferred.errback("Asterisk %s KILLED" % self.host)
+
+ def __cancel_stops(reason):
+ """ Cancel all stop actions - called when the process exits """
+ for token in self.__stop_cancel_tokens:
+ try:
+ token.cancel()
+ except ValueError:
+ pass
return reason
+ self.__stop_cancel_tokens = []
+ self.__stop_attempts = 0
# Start by asking to stop gracefully.
- self.__stop_time = time.time()
- d = __send_stop_gracefully()
- d.addCallback(__stop_gracefully_callback)
- d.addErrback(__stop_gracefully_error)
-
- # There's a chance that we'll get back a 'yes, I'll stop' from Asterisk, and then it
- # just won't. Add a 10 second callback to the reactor to terminate the Asterisk instance
- # with prejudice if it lies to us. That should be more then enough time for one of the
- # previous methods to stop it in a nicer fashion.
- self.__cancel_token = reactor.callLater(10, __check_process_and_kill)
- self.__stop_deferred.addCallback(__cancel_kill)
+ __send_stop_gracefully()
+ # Scedule progressively more aggressive mechanisms of stopping Asterisk
+ self.__stop_cancel_tokens.append(reactor.callLater(5, __send_stop_now))
+ self.__stop_cancel_tokens.append(reactor.callLater(10, __send_term))
+ self.__stop_cancel_tokens.append(reactor.callLater(15, __send_kill))
+
+ self.__stop_deferred.addCallback(__cancel_stops)
return self.__stop_deferred
@@ -471,8 +490,11 @@
used. If no extension is given, the 's' extension will be used.
Keyword Arguments:
- blocking -- When True, do not return from this function until the CLI
- command finishes running. The default is True.
+ blocking -- This used to specify that we should block until the
+ CLI command finished executing. When the Asterisk process was turned
+ over to twisted, that's no longer the case. The keyword argument
+ was kept merely for backwards compliance; callers should *not* expect
+ their calls to block.
Returns:
A deferred object that can be used to listen for command completion
@@ -508,8 +530,11 @@
Keyword Arguments:
cli_cmd -- The command to execute.
- blocking -- When True, the process is spawned as a blocking
- operation, at least as well as we can with twisted.
+ blocking -- This used to specify that we should block until the
+ CLI command finished executing. When the Asterisk process was turned
+ over to twisted, that's no longer the case. The keyword argument
+ was kept merely for backwards compliance; callers should *not* expect
+ their calls to block.
Returns:
A deferred object that will be signaled when the process has exited
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py?view=diff&rev=3099&r1=3098&r2=3099
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py Mon Mar 19 08:29:03 2012
@@ -14,13 +14,13 @@
import os
import subprocess
import logging
-
-from twisted.internet import reactor
+import TestSuiteUtils
+
+from twisted.internet import reactor, defer, utils, protocol
from asterisk import Asterisk
from TestCase import TestCase
logger = logging.getLogger(__name__)
-
class SIPpScenario:
"""
@@ -63,13 +63,45 @@
self.positional_args = tuple(positional_args) # don't allow caller to mangle his own list
self.test_dir = test_dir
self.default_port = 5061
- self.sipp = None
+ self.sipp = TestSuiteUtils.which("sipp")
+ self.passed = False
def run(self):
+ """ Execute a SIPp scenario
+
+ Execute the SIPp scenario that was passed to this object
+
+ Returns:
+ A deferred that can be used to determine when the SIPp Scenario
+ has exited.
+ """
+
+ def __output_callback(result):
+ """ Callback from getProcessOutputAndValue """
+ out, err, code = result
+ logger.debug(out)
+ logger.debug("Launching SIPp Scenario %s exited %d"
+ % (self.scenario['scenario'], code))
+ if (code == 0 or code != 99):
+ self.passed = True
+ logger.info("SIPp Scenario %s Started" % (self.scenario['scenario']))
+ self.__exit_deferred.callback(self)
+ else:
+ logger.warning("Launching SIPp Scenario %s Failed" % (self.scenario['scenario']))
+ self.__exit_deferred.errback(self)
+
+ def __error_callback(result):
+ """ Errback from getProcessOutputAndValue """
+ out, err, code = result
+ logger.warning("Launching of SIPp Scenario %s exited %d with error: %s"
+ % (self.scenario['scenario'], code, err))
+ self.__exit_deferred.errback(self)
+
sipp_args = [
- 'sipp', '127.0.0.1',
+ self.sipp, '127.0.0.1',
'-sf', '%s/sipp/%s' % (self.test_dir, self.scenario['scenario']),
- '-nostdin'
+ '-nostdin', '-bg', '-trace_msg',
+ '-skip_rlimit',
]
default_args = {
'-p' : self.default_port,
@@ -87,27 +119,14 @@
sipp_args.extend(self.positional_args)
logger.info("Executing SIPp scenario: %s" % self.scenario['scenario'])
- logger.info(sipp_args)
-
- self.sipp = subprocess.Popen(sipp_args,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- def waitAndEvaluate(self):
-
- (out, err) = self.sipp.communicate()
-
- result = self.sipp.wait()
- logger.debug(out)
- if result:
- logger.warn("SIPp scenario FAILED")
- passed = False
- logger.warn(err)
- else:
- logger.info("SIPp scenario PASSED")
- passed = True
-
- return passed
-
+ logger.debug(sipp_args)
+
+ self.__exit_deferred = defer.Deferred()
+
+ df = utils.getProcessOutputAndValue(sipp_args[0], sipp_args)
+ df.addCallback(__output_callback)
+ df.addErrback(__error_callback)
+ return self.__exit_deferred
class SIPpTest(TestCase):
"""
More information about the asterisk-commits
mailing list