[asterisk-commits] mjordan: branch mjordan/twisted_process r3099 - /asterisk/team/mjordan/twiste...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Mar 19 08:29:09 CDT 2012


Author: mjordan
Date: Mon Mar 19 08:29:03 2012
New Revision: 3099

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3099
Log:
Update subprocess to reactor with changes from weekend

Started SIPp migration, which is going to be tricky, since twisted
doesn't appear to like opening it.

Modified:
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py?view=diff&rev=3099&r1=3098&r2=3099
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py Mon Mar 19 08:29:03 2012
@@ -131,6 +131,7 @@
         ).addErrback(self.ami_logoff)
 
     def ami_test_done(self, ami, event):
+        logger.debug(str(event))
         if event.get("event") == "Hangup":
             self.check_active_channels()
 

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py?view=diff&rev=3099&r1=3098&r2=3099
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py Mon Mar 19 08:29:03 2012
@@ -22,8 +22,7 @@
 from config import ConfigFile
 from version import AsteriskVersion
 
-from twisted.internet import reactor, protocol, threads, defer
-from twisted.internet import utils
+from twisted.internet import reactor, protocol, defer, utils
 
 logger = logging.getLogger(__name__)
 
@@ -33,11 +32,16 @@
     """
 
     def __init__(self, host, cmd):
-        """
-        Create a new Asterisk CLI Protocol instance
-
+        """ Create a new Asterisk CLI Protocol instance
+
+        This class wraps an Asterisk instance that executes a CLI command
+        against another instance of Asterisk.
+
+        Keyword Arguments:
         host    The host this CLI instance will connect to
-        cmd     The command to spawn (Asterisk instance, with command line parameters)
+        cmd     List of command arguments to spawn.  The first argument must be
+        the location of the Asterisk executable; each subsequent argument should define
+        the CLI command to run and the instance of Asterisk to run it against.
         """
         self.__host = host
         self.__cmd = cmd
@@ -46,14 +50,13 @@
         self.err = ""
 
     def execute(self):
-        """
-        Execute the CLI command.
+        """ Execute the CLI command.
 
         Returns a deferred that will be called when the operation completes.  The
         parameter to the deferred is this object.
         """
         def __cli_output_callback(result):
-            """ Handle regular output from the Asterisk CLI instance """
+            """ Callback from getProcessOutputAndValue """
             self.__set_properties(result)
             logger.debug("Asterisk CLI %s exited %d" % (self.__host, self.exitcode))
             if self.exitcode:
@@ -62,9 +65,10 @@
                 self.__deferred.callback(self)
     
         def __cli_error_callback(result):
-            """ Handle an error coming back from the Asterisk CLI instance """
+            """ Errback from getProcessOutputAndValue """
             self.__set_properties(result)
-            logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
+            logger.warning("Asterisk CLI %s exited %d with error: %s"
+                % (self.__host, self.exitcode, self.err))
             self.__deferred.errback(self)
 
         self.__deferred = defer.Deferred()
@@ -207,8 +211,7 @@
         """ Start this instance of Asterisk.
 
         Returns:
-        A deferred object that is called when Asterisk is fully booted, or an
-        error if Asterisk fails to start
+        A deferred object that will be called when Asterisk is fully booted.
 
         Example Usage:
         asterisk.start()
@@ -218,9 +221,11 @@
         """
 
         def __wait_fully_booted_callback(cli_command):
+            """ Callback for CLI command waitfullybooted """
             self.__start_deferred.callback("Successfully started Asterisk %s" % self.host)
     
         def __wait_fully_booted_error(cli_command):
+            """ Errback for CLI command waitfullybooted """
             if time.time() - self.__start_asterisk_time > 5:
                 logger.error("Asterisk core waitfullybooted for %s failed" % self.host)
                 self.__start_deferred.errback("Command core waitfullybooted failed")
@@ -239,6 +244,10 @@
             "-C", "%s" % os.path.join(self.astetcdir, "asterisk.conf")
         ]
 
+        # Make the start/stop deferreds - this method will return
+        # the start deferred, and pass the stop deferred to the AsteriskProtocol
+        # object.  The stop deferred will be raised when the Asterisk process
+        # exits
         self.__start_deferred = defer.Deferred()
         self.__stop_deferred = defer.Deferred()
         self.processProtocol = AsteriskProtocol(self.host, self.__stop_deferred)
@@ -257,67 +266,73 @@
         This function is used to stop this instance of Asterisk.
 
         Returns a deferred that can be used to detect when Asterisk exits,
-        or if it fails to exit
+        or if it fails to exit.
 
         Example Usage:
         asterisk.stop()
         """
 
         def __send_stop_gracefully():
+            """ Send a core stop gracefully CLI command """
             if self.ast_version < AsteriskVersion("1.6.0"):
                 cli_deferred = self.cli_exec("stop gracefully")
             else:
                 cli_deferred = self.cli_exec("core stop gracefully")
-            return cli_deferred
+            cli_deferred.addCallback(__stop_gracefully_callback)
+            cli_deferred.addErrback(__stop_gracefully_error)
+
+        def __stop_gracefully_callback(cli_command):
+            """ Callback handler for the core stop gracefully CLI command """
+            logger.debug("Successfully stopped Asterisk %s" % self.host)
+            self.__stop_attempts = 0
+
+        def __stop_gracefully_error(cli_command):
+            """ Errback for the core stop gracefully CLI command """
+            if self.__stop_attempts > 5:
+                self.__stop_attempts = 0
+                logger.warning("Asterisk graceful stop for %s failed" % self.host)
+            else:
+                logger.debug("Asterisk graceful stop failed, attempting again...")
+                self.__stop_attemps += 1
+                __send_stop_gracefully()
 
         def __send_stop_now():
+            """ Send a core stop now CLI command """
             if self.ast_version < AsteriskVersion("1.6.0"):
                 cli_deferred = self.cli_exec("stop now")
             else:
                 cli_deferred = self.cli_exec("core stop now")
-            return cli_deferred
-
-        def __stop_gracefully_callback(cli_command):
+            cli_deferred.addCallback(__stop_now_callback)
+            cli_deferred.addErrback(__stop_now_error)
+
+        def __stop_now_callback(cli_command):
+            """ Callback handler for the core stop now CLI command """
             logger.debug("Successfully stopped Asterisk %s" % self.host)
-
-        def __stop_gracefully_error(cli_command):
-            if time.time() - self.__stop_time > 5:
-                logger.warning("Asterisk graceful stop for %s failed, attempting stop now" % self.host)
-                self.__stop_time = time.time()
+            self.__stop_attempts = 0
+
+        def __stop_now_error(cli_command):
+            """ Errback handler for the core stop now CLI command """
+            if self.__stop_attempts > 5:
+                self.__stop_attempts = 0
+                logger.warning("Asterisk graceful stop for %s failed" % self.host)
+            else:
+                logger.debug("Asterisk stop now failed, attempting again...")
+                self.__stop_attempts += 1
                 cli_deferred = __send_stop_now()
                 cli_deferred.addCallback(__stop_now_callback)
                 cli_deferred.addErrback(__stop_now_error)
-            else:
-                logger.debug("Asterisk graceful stop failed, attempting again...")
-                cli_deferred = __send_stop_gracefully()
-                cli_deferred.addCallback(__stop_gracefully_callback)
-                cli_deferred.addErrback(__stop_gracefully_error)
-
-        def __stop_now_callback(cli_command):
-            logger.debug("Successfully stopped Asterisk %s" % self.host)
-
-        def __stop_now_error(cli_command):
-            if time.time() - self.__stop_time > 5:
-                logger.warning("Asterisk  stop now for %s failed, attempting TERM" % self.host)
-                # If even a "stop now" didn't do the trick, fall back to sending
-                # signals to the process.  First, send a SIGTERM.  If it _STILL_ hasn't
-                # gone away after another 5 seconds, send SIGKILL.
-                try:
-                    self.process.signalProcess("TERM")
-                    if not self.processProtocol.exited:
-                        reactor.callLater(5, __check_process_and_kill)
-                except twisted.internet.error.ProcessExitedAlready:
-                    # Probably that we sent a signal to a process that was already
-                    # dead.  Just ignore it.
-                    pass
-                logger.debug("Successfully terminated Asterisk %s" % self.host)
-            else:
-                logger.debug("Asterisk stop now failed, attempting again...")
-                cli_deferred = __send_stop_now()
-                cli_deferred.addCallback(__stop_now_callback)
-                cli_deferred.addErrback(__stop_now_error)
-
-        def __check_process_and_kill():
+
+        def __send_term():
+            try:
+                logger.info("Sending TERM to Asterisk %s" % self.host)
+                self.process.signalProcess("TERM")
+            except twisted.internet.error.ProcessExitedAlready:
+                # Probably that we sent a signal to a process that was already
+                # dead.  Just ignore it.
+                pass
+
+        def __send_kill():
+            """ Check to see if the process is running and kill it with fire """
             try:
                 if not self.processProtocol.exited:
                     logger.info("Sending KILL to Asterisk %s" % self.host)
@@ -325,32 +340,36 @@
             except twisted.internet.error.ProcessExitedAlready:
                 # Pass on this
                 pass
-            # If you kill the process, the ProcessProtocol will never get the note that its dead
+            # If you kill the process, the ProcessProtocol will never get the note
+            # that its dead.  Call the stop callback to notify everyone that we did
+            # indeed kill the Asterisk instance.
             try:
+                # Attempt to signal the process object that it should lose its
+                # connection - it may already be gone however.
                 self.process.loseConnection()
             except:
                 pass
-            self.__stop_deferred.callback("Asterisk %s KILLED" % self.host)
-
-        def __cancel_kill(reason):
-            try:
-                self.__cancel_token.cancel()
-            except ValueError:
-                pass
+            self.__stop_deferred.errback("Asterisk %s KILLED" % self.host)
+
+        def __cancel_stops(reason):
+            """ Cancel all stop actions - called when the process exits """
+            for token in self.__stop_cancel_tokens:
+                try:
+                    token.cancel()
+                except ValueError:
+                    pass
             return reason
 
+        self.__stop_cancel_tokens = []
+        self.__stop_attempts = 0
         # Start by asking to stop gracefully.
-        self.__stop_time = time.time()
-        d = __send_stop_gracefully()
-        d.addCallback(__stop_gracefully_callback)
-        d.addErrback(__stop_gracefully_error)
-
-        # There's a chance that we'll get back a 'yes, I'll stop' from Asterisk, and then it
-        # just won't.  Add a 10 second callback to the reactor to terminate the Asterisk instance
-        # with prejudice if it lies to us.  That should be more then enough time for one of the
-        # previous methods to stop it in a nicer fashion.
-        self.__cancel_token = reactor.callLater(10, __check_process_and_kill)
-        self.__stop_deferred.addCallback(__cancel_kill)
+        __send_stop_gracefully()
+        # Scedule progressively more aggressive mechanisms of stopping Asterisk
+        self.__stop_cancel_tokens.append(reactor.callLater(5, __send_stop_now))
+        self.__stop_cancel_tokens.append(reactor.callLater(10, __send_term))
+        self.__stop_cancel_tokens.append(reactor.callLater(15, __send_kill))
+
+        self.__stop_deferred.addCallback(__cancel_stops)
 
         return self.__stop_deferred
 
@@ -471,8 +490,11 @@
         used. If no extension is given, the 's' extension will be used.
 
         Keyword Arguments:
-        blocking -- When True, do not return from this function until the CLI
-                    command finishes running.  The default is True.
+        blocking -- This used to specify that we should block until the
+        CLI command finished executing.  When the Asterisk process was turned
+        over to twisted, that's no longer the case.  The keyword argument
+        was kept merely for backwards compliance; callers should *not* expect
+        their calls to block.
 
         Returns:
         A deferred object that can be used to listen for command completion
@@ -508,8 +530,11 @@
 
         Keyword Arguments:
         cli_cmd -- The command to execute.
-        blocking -- When True, the process is spawned as a blocking
-        operation, at least as well as we can with twisted.
+        blocking -- This used to specify that we should block until the
+        CLI command finished executing.  When the Asterisk process was turned
+        over to twisted, that's no longer the case.  The keyword argument
+        was kept merely for backwards compliance; callers should *not* expect
+        their calls to block.
 
         Returns:
         A deferred object that will be signaled when the process has exited

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py?view=diff&rev=3099&r1=3098&r2=3099
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/sipp.py Mon Mar 19 08:29:03 2012
@@ -14,13 +14,13 @@
 import os
 import subprocess
 import logging
-
-from twisted.internet import reactor
+import TestSuiteUtils
+
+from twisted.internet import reactor, defer, utils, protocol
 from asterisk import Asterisk
 from TestCase import TestCase
 
 logger = logging.getLogger(__name__)
-
 
 class SIPpScenario:
     """
@@ -63,13 +63,45 @@
         self.positional_args = tuple(positional_args) # don't allow caller to mangle his own list
         self.test_dir = test_dir
         self.default_port = 5061
-        self.sipp = None
+        self.sipp = TestSuiteUtils.which("sipp")
+        self.passed = False
 
     def run(self):
+        """ Execute a SIPp scenario
+
+        Execute the SIPp scenario that was passed to this object
+
+        Returns:
+        A deferred that can be used to determine when the SIPp Scenario
+        has exited.
+        """
+
+        def __output_callback(result):
+            """ Callback from getProcessOutputAndValue """
+            out, err, code = result
+            logger.debug(out)
+            logger.debug("Launching SIPp Scenario %s exited %d"
+                % (self.scenario['scenario'], code))
+            if (code == 0 or code != 99):
+                self.passed = True
+                logger.info("SIPp Scenario %s Started" % (self.scenario['scenario']))
+                self.__exit_deferred.callback(self)
+            else:
+                logger.warning("Launching SIPp Scenario %s Failed" % (self.scenario['scenario']))
+                self.__exit_deferred.errback(self)
+
+        def __error_callback(result):
+            """ Errback from getProcessOutputAndValue """
+            out, err, code = result
+            logger.warning("Launching of SIPp Scenario %s exited %d with error: %s"
+                % (self.scenario['scenario'], code, err))
+            self.__exit_deferred.errback(self)
+
         sipp_args = [
-                'sipp', '127.0.0.1',
+                self.sipp, '127.0.0.1',
                 '-sf', '%s/sipp/%s' % (self.test_dir, self.scenario['scenario']),
-                '-nostdin'
+                '-nostdin', '-bg', '-trace_msg',
+                '-skip_rlimit',
         ]
         default_args = {
             '-p' : self.default_port,
@@ -87,27 +119,14 @@
         sipp_args.extend(self.positional_args)
 
         logger.info("Executing SIPp scenario: %s" % self.scenario['scenario'])
-        logger.info(sipp_args)
-
-        self.sipp = subprocess.Popen(sipp_args,
-                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
-    def waitAndEvaluate(self):
-
-        (out, err) = self.sipp.communicate()
-
-        result = self.sipp.wait()
-        logger.debug(out)
-        if result:
-            logger.warn("SIPp scenario FAILED")
-            passed = False
-            logger.warn(err)
-        else:
-            logger.info("SIPp scenario PASSED")
-            passed = True
-
-        return passed
-
+        logger.debug(sipp_args)
+
+        self.__exit_deferred = defer.Deferred()
+
+        df = utils.getProcessOutputAndValue(sipp_args[0], sipp_args)
+        df.addCallback(__output_callback)
+        df.addErrback(__error_callback)
+        return self.__exit_deferred
 
 class SIPpTest(TestCase):
     """




More information about the asterisk-commits mailing list