[asterisk-commits] mjordan: testsuite/asterisk/trunk r3132 - in /asterisk/trunk: ./ lib/python/a...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Mar 23 08:47:41 CDT 2012


Author: mjordan
Date: Fri Mar 23 08:47:32 2012
New Revision: 3132

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3132
Log:
Migration of Asterisk Test Suite to use twisted for process management

The standard Python subprocess module and the twisted framework do not
interact well together.  Both twisted and subprocess attempt to handle
SIGCHLD signals, with the result that (potentially) one or the other
fails in some fashion.  In some tests, most notably the lightweight NAT
tests developed in support of Asterisk 11, this manifests as an
unresponsive twisted reactor.

This patch removes the dependency on the Python subprocess module for
managing Asterisk and SIPp processes, and instead uses twisted process
management.  This has the added benefit of improving performance for
some tests.

(closes issue ASTERISK-19409)
Reported by: Matt Jordan
Tested by: Matt Jordan, Paul Belanger, Josh Colp

Review: https://reviewboard.asterisk.org/r/1759/


Added:
    asterisk/trunk/tests/iax2/basic-call/configs/ast2/cdr.conf
      - copied unchanged from r3104, asterisk/team/mjordan/twisted_process/tests/iax2/basic-call/configs/ast2/cdr.conf
Removed:
    asterisk/trunk/lib/python/asterisk/utils.py
    asterisk/trunk/tests/chanspy/chanspy_barge/configs/ast1/manager.conf
    asterisk/trunk/tests/iax2/basic-call/configs/ast1/manager.conf
    asterisk/trunk/tests/iax2/basic-call/configs/ast2/manager.conf
    asterisk/trunk/tests/udptl_v6/configs/ast1/manager.conf
    asterisk/trunk/tests/udptl_v6/configs/ast2/manager.conf
Modified:
    asterisk/trunk/   (props changed)
    asterisk/trunk/lib/python/asterisk/CDRTestCase.py
    asterisk/trunk/lib/python/asterisk/TestCase.py
    asterisk/trunk/lib/python/asterisk/TestConfig.py
    asterisk/trunk/lib/python/asterisk/asterisk.py
    asterisk/trunk/lib/python/asterisk/cdr.py
    asterisk/trunk/lib/python/asterisk/sipp.py
    asterisk/trunk/lib/python/asterisk/sippversion.py
    asterisk/trunk/lib/python/asterisk/version.py
    asterisk/trunk/runtests.py
    asterisk/trunk/tests/apps/incomplete/sip_incomplete/run-test
    asterisk/trunk/tests/apps/voicemail/check_voicemail_forward_with_prepend/run-test
    asterisk/trunk/tests/apps/voicemail/check_voicemail_new_user_hangup/run-test
    asterisk/trunk/tests/apps/voicemail/func_vmcount/run-test
    asterisk/trunk/tests/channels/SIP/handle_response_address_incomplete/run-test
    asterisk/trunk/tests/channels/SIP/info_dtmf/run-test
    asterisk/trunk/tests/channels/SIP/nat_supertest/run-test
    asterisk/trunk/tests/channels/SIP/nat_supertest/sipp/inject.csv
    asterisk/trunk/tests/channels/SIP/nat_supertest/test-config.yaml
    asterisk/trunk/tests/channels/SIP/realtime_nosipregs/run-test
    asterisk/trunk/tests/channels/SIP/realtime_sipregs/run-test
    asterisk/trunk/tests/channels/SIP/sip_hold/run-test
    asterisk/trunk/tests/channels/SIP/sip_register_domain_acl/run-test
    asterisk/trunk/tests/channels/SIP/sip_tls_call/configs/ast1/extensions.conf
    asterisk/trunk/tests/channels/SIP/sip_tls_call/configs/ast2/extensions.conf
    asterisk/trunk/tests/channels/SIP/sip_tls_register/run-test
    asterisk/trunk/tests/channels/SIP/use_contact_from_200/run-test
    asterisk/trunk/tests/chanspy/chanspy_barge/run-test
    asterisk/trunk/tests/chanspy/chanspy_w_mixmonitor/run-test
    asterisk/trunk/tests/dynamic-modules/run-test
    asterisk/trunk/tests/dynamic-modules/test-config.yaml
    asterisk/trunk/tests/fastagi/get-data/run-test
    asterisk/trunk/tests/fastagi/hangup/run-test
    asterisk/trunk/tests/fastagi/say-alpha/run-test
    asterisk/trunk/tests/fastagi/say-date/run-test
    asterisk/trunk/tests/fastagi/say-datetime/run-test
    asterisk/trunk/tests/fastagi/say-digits/run-test
    asterisk/trunk/tests/fastagi/say-number/run-test
    asterisk/trunk/tests/fastagi/say-phonetic/run-test
    asterisk/trunk/tests/fastagi/say-time/run-test
    asterisk/trunk/tests/feature_attended_transfer/run-test
    asterisk/trunk/tests/feature_blonde_transfer/run-test
    asterisk/trunk/tests/iax2/basic-call/configs/ast1/extensions.conf
    asterisk/trunk/tests/iax2/basic-call/configs/ast2/extensions.conf
    asterisk/trunk/tests/iax2/basic-call/run-test
    asterisk/trunk/tests/mixmonitor/run-test
    asterisk/trunk/tests/mixmonitor_audiohook_inherit/run-test
    asterisk/trunk/tests/udptl_v6/run-test

Propchange: asterisk/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar 23 08:47:32 2012
@@ -1,0 +1,2 @@
+/asterisk/team/group/cdr_test_log_congestion:1823-2951
+/asterisk/trunk:1112

Propchange: asterisk/trunk/
------------------------------------------------------------------------------
    svnmerge-integrated = /asterisk/trunk:1-2945

Modified: asterisk/trunk/lib/python/asterisk/CDRTestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/CDRTestCase.py?view=diff&rev=3132&r1=3131&r2=3132
==============================================================================
--- asterisk/trunk/lib/python/asterisk/CDRTestCase.py (original)
+++ asterisk/trunk/lib/python/asterisk/CDRTestCase.py Fri Mar 23 08:47:32 2012
@@ -73,7 +73,8 @@
         self.CDRFileExpectations[recordname].append(cdrline)
 
     def match_cdrs(self):
-        #Automatically invoked at the end of the test, this will test the CDR files against the listed expectations.
+        """ Automatically invoked at the end of the test, this will test the CDR files against the listed expectations.
+        """
         self.passed = True
 
         for key in self.CDRFileExpectations:
@@ -114,7 +115,6 @@
     def ami_logoff(self, ami):
         """
         An AMI callback event.
-
         """
         self.stop_reactor()
 
@@ -131,13 +131,25 @@
         ).addErrback(self.ami_logoff)
 
     def ami_test_done(self, ami, event):
+        """ Check to see if the test is done during a hangup event
+        """
+        logger.debug(str(event))
         if event.get("event") == "Hangup":
-            if self.no_active_channels():
-                try:
-                    self.stop_reactor()
-                except ReactorNotRunning:
-                    # No problemo.
-                    pass
+            self.check_active_channels()
+
+    def check_active_channels(self):
+        """ Check to see if we have any active channels.  If we don't kill the reactor.
+        """
+        def __parse_output(result):
+            first_line = result.output.split('\n', 1)[0]
+            first_number = first_line.split(' ', 1)[0]
+            if first_number == '0':
+                logger.debug("No channels detected; stopping reactor")
+                self.stop_reactor()
+
+        for asterisk in self.ast:
+            cli_deferred = asterisk.cli_exec('core show channels count')
+            cli_deferred.addCallback(__parse_output)
 
     def run(self):
         """

Modified: asterisk/trunk/lib/python/asterisk/TestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/TestCase.py?view=diff&rev=3132&r1=3131&r2=3132
==============================================================================
--- asterisk/trunk/lib/python/asterisk/TestCase.py (original)
+++ asterisk/trunk/lib/python/asterisk/TestCase.py Fri Mar 23 08:47:32 2012
@@ -13,7 +13,7 @@
 import os
 import datetime
 import time
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from starpy import manager, fastagi
 
 from asterisk import Asterisk
@@ -59,6 +59,7 @@
         self.testStateController = None
         self.pcap = None
         self.pcapfilename = None
+        self.__stopping = False
         self.testlogdir = os.path.join(Asterisk.test_suite_root, self.base, str(os.getpid()))
         self.ast_version = AsteriskVersion()
 
@@ -84,7 +85,7 @@
         self.__setup_conditions()
 
         logger.info("Executing " + self.test_name)
-        reactor.callWhenRunning(self.run)
+        reactor.callWhenRunning(self.__run)
 
     def __setup_conditions(self):
         """
@@ -199,63 +200,120 @@
         return PcapListener(device, bpf_filter, dumpfile, self.__pcap_callback)
 
     def start_asterisk(self):
+        """ This is kept mostly for legacy support.  When the TestCase previously
+        used a synchronous, blocking mechanism independent of the twisted
+        reactor to spawn Asterisk, this method was called.  Now the Asterisk
+        instances are started immediately after the reactor has started.
+
+        Derived methods can still implement this and have it be called when
+        the reactor is running, but immediately before instances of Asterisk
+        are launched.
+        """
+        pass
+
+    def __start_asterisk(self):
         """
         Start the instances of Asterisk that were previously created.  See
-        create_asterisk.  Note that this should be called before the reactor is
-        told to run.
-        """
+        create_asterisk.  Note that this should be the first thing called
+        when the reactor has started to run
+        """
+        def __check_success_failure(result):
+            """ Make sure the instances started properly """
+            for (success, value) in result:
+                if not success:
+                    logger.error(value)
+                    self.stop_reactor()
+            return result
+
+        def __perform_pre_checks(result):
+            """ Execute the pre-condition checks """
+            self.testConditionController.evaluate_pre_checks()
+            return result
+
+        def __run_callback(result):
+            """ Notify the test that we are running """
+            self.run()
+            return result
+
+        # Call the method that derived objects can override
+        self.start_asterisk()
+
+        # Gather up the deferred objects from each of the instances of Asterisk
+        # and wait until all are finished before proceeding
+        start_defers = []
         for index, item in enumerate(self.ast):
             logger.info("Starting Asterisk instance %d" % (index + 1))
-            self.ast[index].start()
-        self.testConditionController.evaluate_pre_checks()
+            temp_defer = self.ast[index].start()
+            start_defers.append(temp_defer)
+
+        d = defer.DeferredList(start_defers, consumeErrors=True)
+        d.addCallback(__check_success_failure)
+        d.addCallback(__perform_pre_checks)
+        d.addCallback(__run_callback)
 
     def stop_asterisk(self):
         """
-        Stop the instances of Asterisk that were previously started.  See
-        start_asterisk.  Note that this should be called after the reactor has
-        returned from its run.
-
-        If there were errors exiting asterisk, this function will return False.
-        """
-        res = True
+        Called when the instances of Asterisk are being stopped.  Note that previously,
+        this explicitly stopped the Asterisk instances: now they are stopped automatically
+        when the reactor is stopped.
+
+        Derived methods can still implement this and have it be called when
+        the reactor is running, but immediately before instances of Asterisk
+        are stopped.
+        """
+        pass
+
+    def __stop_asterisk(self):
+        """ Stops the instances of Asterisk.
+
+        Stops the instances of Asterisk - called when stop_reactor is called.  This
+        returns a deferred object that can be used to be notified when all instances
+        of Asterisk have stopped.
+         """
+        def __check_success_failure(result):
+            """ Make sure the instances stopped properly """
+            for (success, value) in result:
+                if not success:
+                    logger.warning(value)
+                    # This should already be called when the reactor is being terminated.
+                    # If we couldn't stop the instance of Asterisk, there isn't much else to do
+                    # here other then complain
+            return result
+
+        # Call the overridable method
+        self.stop_asterisk()
+
         self.testConditionController.evaluate_post_checks()
+
+        # Gather up the stopped defers; check success failure of stopping when
+        # all instances of Asterisk have stopped
+        stop_defers = []
         for index, item in enumerate(self.ast):
             logger.info("Stopping Asterisk instance %d" % (index + 1))
-            returncode = self.ast[index].stop()
-            if returncode < 0:
-                # XXX setting passed here might be overridden later in a
-                # derived class. This is bad.
-                self.passed = False
-                logger.error("Asterisk instance %d exited with signal %d" % (index + 1, abs(returncode)))
-                res = False
-            elif returncode > 0:
-                # XXX same here
-                self.passed = False
-                logger.error("Asterisk instance %d exited with non-zero return code %d" % (index + 1, returncode))
-                res = False
-
-        return res
-
-    def no_active_channels(self):
-        """
-        Return true if all our asterisk children have 0 active channels.
-        """
-        for asterisk in self.ast:
-            # 0 active channels
-            first_line = asterisk.cli_exec('core show channels count').split('\n', 1)[0]
-            # 0
-            first_number = first_line.split(' ', 1)[0]
-            if first_number != '0':
-                return False
-        return True
+            temp_defer = self.ast[index].stop()
+            stop_defers.append(temp_defer)
+
+        d = defer.DeferredList(stop_defers, consumeErrors=True)
+        d.addCallback(__check_success_failure)
+        return d
 
     def stop_reactor(self):
         """
         Stop the reactor and cancel the test.
         """
-        logger.info("Stopping Reactor")
-        if reactor.running:
-            reactor.stop()
+        def __stop_reactor(result):
+            """ Called when the Asterisk instances are stopped """
+            logger.info("Stopping Reactor")
+            if reactor.running:
+                try:
+                    reactor.stop()
+                except twisted.internet.error.ReactorNotRunning:
+                    # Something stopped it between our checks - at least we're stopped
+                    pass
+        if not self.__stopping:
+            df = self.__stop_asterisk()
+            df.addCallback(__stop_reactor)
+            self.__stopping = True
 
     def __reactor_timeout(self):
         """
@@ -264,6 +322,18 @@
         """
         logger.warning("Reactor timeout: '%s' seconds" % self.reactor_timeout)
         self.stop_reactor()
+
+    def __run(self):
+        """
+        Private entry point called when the reactor first starts up.
+        This needs to first ensure that Asterisk is fully up and running before
+        moving on.
+        """
+        if (self.ast):
+            self.__start_asterisk()
+        else:
+            # If no instances of Asterisk are needed, go ahead and just run
+            self.run()
 
     def run(self):
         """
@@ -305,7 +375,6 @@
         pass
 
     def __pcap_callback(self, packet):
-        logger.debug("Received packet: %s\n" % (packet,))
         self.pcap_callback(packet)
 
     def handleOriginateFailure(self, reason):

Modified: asterisk/trunk/lib/python/asterisk/TestConfig.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/TestConfig.py?view=diff&rev=3132&r1=3131&r2=3132
==============================================================================
--- asterisk/trunk/lib/python/asterisk/TestConfig.py (original)
+++ asterisk/trunk/lib/python/asterisk/TestConfig.py Fri Mar 23 08:47:32 2012
@@ -12,14 +12,13 @@
 import sys
 import os
 import subprocess
-import optparse
-import time
 import yaml
 import socket
 
 sys.path.append("lib/python")
 
-import utils
+import TestSuiteUtils
+
 from version import AsteriskVersion
 from asterisk import Asterisk
 from buildoptions import AsteriskBuildOptions
@@ -107,7 +106,7 @@
         self.met = False
         if "app" in dep:
             self.name = dep["app"]
-            self.met = utils.which(self.name) is not None
+            self.met = TestSuiteUtils.which(self.name) is not None
         elif "python" in dep:
             self.name = dep["python"]
             try:
@@ -184,7 +183,7 @@
         we run pjsua --help and parse the output to determine if --ipv6
         is a valid option
         '''
-        if utils.which('pjsua') is None:
+        if TestSuiteUtils.which('pjsua') is None:
             return False
 
         help_output = subprocess.Popen(['pjsua', '--help'],

Modified: asterisk/trunk/lib/python/asterisk/asterisk.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/asterisk.py?view=diff&rev=3132&r1=3131&r2=3132
==============================================================================
--- asterisk/trunk/lib/python/asterisk/asterisk.py (original)
+++ asterisk/trunk/lib/python/asterisk/asterisk.py Fri Mar 23 08:47:32 2012
@@ -13,17 +13,130 @@
 
 import sys
 import os
-import signal
 import time
 import shutil
-import subprocess
-import utils
 import logging
+
+import TestSuiteUtils
 
 from config import ConfigFile
 from version import AsteriskVersion
 
+from twisted.internet import reactor, protocol, defer, utils, error
+
 logger = logging.getLogger(__name__)
+
+class AsteriskCliCommand():
+    """
+    Class that manages an Asterisk CLI command.
+    """
+
+    def __init__(self, host, cmd):
+        """ Create a new Asterisk CLI Protocol instance
+
+        This class wraps an Asterisk instance that executes a CLI command
+        against another instance of Asterisk.
+
+        Keyword Arguments:
+        host    The host this CLI instance will connect to
+        cmd     List of command arguments to spawn.  The first argument must be
+        the location of the Asterisk executable; each subsequent argument should define
+        the CLI command to run and the instance of Asterisk to run it against.
+        """
+        self.__host = host
+        self.__cmd = cmd
+        self.exitcode = -1
+        self.output = ""
+        self.err = ""
+
+    def execute(self):
+        """ Execute the CLI command.
+
+        Returns a deferred that will be called when the operation completes.  The
+        parameter to the deferred is this object.
+        """
+        def __cli_output_callback(result):
+            """ Callback from getProcessOutputAndValue """
+            self.__set_properties(result)
+            logger.debug("Asterisk CLI %s exited %d" % (self.__host, self.exitcode))
+            if self.exitcode:
+                self.__deferred.errback(self)
+            else:
+                self.__deferred.callback(self)
+
+        def __cli_error_callback(result):
+            """ Errback from getProcessOutputAndValue """
+            self.__set_properties(result)
+            logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
+            self.__deferred.errback(self)
+
+        self.__deferred = defer.Deferred()
+        df = utils.getProcessOutputAndValue(self.__cmd[0], self.__cmd)
+        df.addCallbacks(__cli_output_callback, __cli_error_callback)
+
+        return self.__deferred
+
+    def __set_properties(self, result):
+        """ Set the properties based on the result of the getProcessOutputAndValue call """
+        out, err, code = result
+        self.exitcode = code
+        self.output = out
+        self.err = err
+
+class AsteriskProtocol(protocol.ProcessProtocol):
+    """
+    Class that manages an Asterisk instance
+    """
+
+    def __init__(self, host, stop_deferred):
+        """ Create an AsteriskProtocol object
+
+        Create an AsteriskProtocol object, which manages the interactions with
+        the Asterisk process
+
+        Keyword Arguments:
+        host - the hostname or address of the Asterisk instance
+        stop_deferred - a twisted Deferred object that will be called when the
+        process has exited
+        """
+
+        self.output = ""
+        self.__host = host
+        self.exitcode = 0
+        self.exited = False
+        self.__stop_deferred = stop_deferred
+
+    def outReceived(self, data):
+        """ Override of ProcessProtocol.outReceived """
+        logger.debug("Asterisk %s received: %s" % (self.__host, data))
+        self.output += data
+
+    def connectionMade(self):
+        """ Override of ProcessProtocol.connectionMade """
+        logger.debug("Asterisk %s - connection made" % (self.__host))
+
+    def errReceived(self, data):
+        """ Override of ProcessProtocol.errReceived """
+        logger.warn("Asterisk %s received error: %s" % (self.__host, data))
+
+    def processEnded(self, reason):
+        """ Override of ProcessProtocol.processEnded """
+        message = ""
+        if reason.value and reason.value.exitCode:
+            message = "Asterisk %s ended with code %d" % (self.__host, reason.value.exitCode,)
+            self.exitcode = reason.value.exitCode
+        else:
+            message = "Asterisk %s ended " % self.__host
+        try:
+            # When Asterisk gets itself terminated with a KILL signal, this may (or may not)
+            # ever get called, in which case the Asterisk object itself that is terminating
+            # this process will attempt to raise the stop deferred.  Prevent calling the
+            # object twice.
+            if not self.__stop_deferred.called:
+                self.__stop_deferred.callback(message)
+        except defer.AlreadyCalledError:
+            logger.warning("Asterisk %s stop deferred already called" % self.__host)
+        self.exited = True
 
 class Asterisk:
     """An instance of Asterisk.
@@ -68,7 +181,7 @@
         if base is not None:
             self.base = "%s/%s" % (self.base, base)
         self.astetcdir = Asterisk.asterisk_etc_directory
-        self.ast_binary = utils.which("asterisk") or "/usr/sbin/asterisk"
+        self.ast_binary = TestSuiteUtils.which("asterisk") or "/usr/sbin/asterisk"
         self.host = host
 
         self.__ast_conf_options = ast_conf_options
@@ -105,9 +218,10 @@
                     self.directories[var] = val
 
     def start(self):
-        """Start this instance of Asterisk.
-
-        This function starts up this instance of Asterisk.
+        """ Start this instance of Asterisk.
+
+        Returns:
+        A deferred object that will be called when Asterisk is fully booted.
 
         Example Usage:
         asterisk.start()
@@ -115,6 +229,21 @@
         Note that calling this will install the default testsuite
         config files, if they have not already been installed
         """
+
+        def __wait_fully_booted_callback(cli_command):
+            """ Callback for CLI command waitfullybooted """
+            self.__start_deferred.callback("Successfully started Asterisk %s" % self.host)
+
+        def __wait_fully_booted_error(cli_command):
+            """ Errback for CLI command waitfullybooted """
+            if time.time() - self.__start_asterisk_time > 5:
+                logger.error("Asterisk core waitfullybooted for %s failed" % self.host)
+                self.__start_deferred.errback("Command core waitfullybooted failed")
+            else:
+                logger.debug("Asterisk core waitfullybooted failed, attempting again...")
+                cli_deferred = self.cli_exec("core waitfullybooted")
+                cli_deferred.addCallbacks(__wait_fully_booted_callback, __wait_fully_booted_error)
+
         self.install_configs(os.getcwd() + "/configs")
         self.__setup_configs()
 
@@ -123,85 +252,143 @@
             "-f", "-g", "-q", "-m", "-n",
             "-C", "%s" % os.path.join(self.astetcdir, "asterisk.conf")
         ]
-        try:
-            self.process = subprocess.Popen(cmd)
-        except OSError:
-            logger.error("Failed to execute command: %s" % str(cmd))
-            return False
-
-        # Be _really_ sure that Asterisk has started up before returning.
-
-        # Poll the instance to make sure we created it successfully
-        self.process.poll()
-        if self.process.returncode != None:
-            """ Rut roh, Asterisk process exited prematurely """
-            logger.error("Asterisk instance %s exited prematurely with return code %d" % (self.host, self.process.returncode))
-
-        start = time.time()
-        while True:
-            # This command should stall until completed, but if an
-            # exception occurs, it returns the empty string.
-            if not self.cli_exec("core waitfullybooted", warn_on_fail=False):
-                if time.time() - start > 5:
-                    logger.error("Unknown state of asterisk. Stopping waitfullybooted...")
-                    break
-                logger.debug("Attempting waitfullybooted again...")
-            else:
-                # We're fully booted...
-                break
+
+        # Make the start/stop deferreds - this method will return
+        # the start deferred, and pass the stop deferred to the AsteriskProtocol
+        # object.  The stop deferred will be raised when the Asterisk process
+        # exits
+        self.__start_deferred = defer.Deferred()
+        self.__stop_deferred = defer.Deferred()
+        self.processProtocol = AsteriskProtocol(self.host, self.__stop_deferred)
+        self.process = reactor.spawnProcess(self.processProtocol, cmd[0], cmd)
+
+        # Begin the wait fully booted cycle
+        self.__start_asterisk_time = time.time()
+        cli_deferred = self.cli_exec("core waitfullybooted")
+        cli_deferred.addCallbacks(__wait_fully_booted_callback, __wait_fully_booted_error)
+        return self.__start_deferred
 
     def stop(self):
         """Stop this instance of Asterisk.
 
         This function is used to stop this instance of Asterisk.
 
+        Returns:
+        A deferred that can be used to detect when Asterisk exits,
+        or if it fails to exit.
+
         Example Usage:
         asterisk.stop()
         """
+
+        def __send_stop_gracefully():
+            """ Send a core stop gracefully CLI command """
+            if self.ast_version < AsteriskVersion("1.6.0"):
+                cli_deferred = self.cli_exec("stop gracefully")
+            else:
+                cli_deferred = self.cli_exec("core stop gracefully")
+            cli_deferred.addCallbacks(__stop_gracefully_callback, __stop_gracefully_error)
+
+        def __stop_gracefully_callback(cli_command):
+            """ Callback handler for the core stop gracefully CLI command """
+            logger.debug("Successfully stopped Asterisk %s" % self.host)
+            self.__stop_attempts = 0
+
+        def __stop_gracefully_error(cli_command):
+            """ Errback for the core stop gracefully CLI command """
+            if self.__stop_attempts > 5:
+                self.__stop_attempts = 0
+                logger.warning("Asterisk graceful stop for %s failed" % self.host)
+            else:
+                logger.debug("Asterisk graceful stop failed, attempting again...")
+                self.__stop_attempts += 1
+                __send_stop_gracefully()
+
+        def __send_stop_now():
+            """ Send a core stop now CLI command """
+            if self.ast_version < AsteriskVersion("1.6.0"):
+                cli_deferred = self.cli_exec("stop now")
+            else:
+                cli_deferred = self.cli_exec("core stop now")
+            if cli_deferred:
+                cli_deferred.addCallbacks(__stop_now_callback, __stop_now_error)
+
+        def __stop_now_callback(cli_command):
+            """ Callback handler for the core stop now CLI command """
+            logger.debug("Successfully stopped Asterisk %s" % self.host)
+            self.__stop_attempts = 0
+
+        def __stop_now_error(cli_command):
+            """ Errback handler for the core stop now CLI command """
+            if self.__stop_attempts > 5:
+                self.__stop_attempts = 0
+                logger.warning("Asterisk graceful stop for %s failed" % self.host)
+            else:
+                logger.debug("Asterisk stop now failed, attempting again...")
+                self.__stop_attempts += 1
+                cli_deferred = __send_stop_now()
+                if cli_deferred:
+                    cli_deferred.addCallbacks(__stop_now_callback, __stop_now_error)
+
+        def __send_term():
+            """ Send a TERM signal to the Asterisk instance """
+            try:
+                logger.info("Sending TERM to Asterisk %s" % self.host)
+                self.process.signalProcess("TERM")
+            except error.ProcessExitedAlready:
+                # Probably that we sent a signal to a process that was already
+                # dead.  Just ignore it.
+                pass
+
+        def __send_kill():
+            """ Check to see if the process is running and kill it with fire """
+            try:
+                if not self.processProtocol.exited:
+                    logger.info("Sending KILL to Asterisk %s" % self.host)
+                    self.process.signalProcess("KILL")
+            except error.ProcessExitedAlready:
+                # Pass on this
+                pass
+            # If you kill the process, the ProcessProtocol may never get the note
+            # that its dead.  Call the stop callback to notify everyone that we did
+            # indeed kill the Asterisk instance.
+            try:
+                # Attempt to signal the process object that it should lose its
+                # connection - it may already be gone however.
+                self.process.loseConnection()
+            except:
+                pass
+            try:
+                if not self.__stop_deferred.called:
+                    self.__stop_deferred.callback("Asterisk %s KILLED" % self.host)
+            except defer.AlreadyCalledError:
+                logger.warning("Asterisk %s stop deferred already called" % self.host)
+
+        def __cancel_stops(reason):
+            """ Cancel all stop actions - called when the process exits """
+            for token in self.__stop_cancel_tokens:
+                try:
+                    if token.active():
+                        token.cancel()
+                except error.AlreadyCalled:
+                    # If we're canceling something that's already been called, move on
+                    pass
+            return reason
+
+        self.__stop_cancel_tokens = []
+        self.__stop_attempts = 0
         # Start by asking to stop gracefully.
-        if self.ast_version < AsteriskVersion("1.6.0"):
-            self.cli_exec("stop gracefully")
-        else:
-            self.cli_exec("core stop gracefully")
-        for i in xrange(5):
-            time.sleep(1.0)
-            if self.process.poll() is not None:
-                return self.process.returncode
-
-        # Check for locks
-        self.cli_exec("core show locks")
-
-        # If the graceful shutdown did not complete within 5 seconds, ask
-        # Asterisk to stop right now.
-        if self.ast_version < AsteriskVersion("1.6.0"):
-            self.cli_exec("stop now")
-        else:
-            self.cli_exec("core stop now")
-        for i in xrange(5):
-            time.sleep(1.0)
-            if self.process.poll() is not None:
-                return self.process.returncode
-
-        # If even a "stop now" didn't do the trick, fall back to sending
-        # signals to the process.  First, send a SIGTERM.  If it _STILL_ hasn't
-        # gone away after another 5 seconds, send SIGKILL.
-        try:
-            os.kill(self.process.pid, signal.SIGTERM)
-            for i in xrange(5):
-                time.sleep(1.0)
-                if self.process.poll() is not None:
-                    return self.process.returncode
-            os.kill(self.process.pid, signal.SIGKILL)
-        except OSError:
-            # Probably that we sent a signal to a process that was already
-            # dead.  Just ignore it.
-            pass
-
-        # We have done everything we can do at this point.  Wait for the
-        # process to exit.
-        self.process.wait()
-
-        return self.process.returncode
+        __send_stop_gracefully()
+
+        # Schedule progressively more aggressive mechanisms of stopping Asterisk.  If any
+        # stop mechanism succeeds, all are canceled
+        self.__stop_cancel_tokens.append(reactor.callLater(5, __send_stop_now))
+        self.__stop_cancel_tokens.append(reactor.callLater(10, __send_term))
+        self.__stop_cancel_tokens.append(reactor.callLater(15, __send_kill))
+
+        self.__stop_deferred.addCallback(__cancel_stops)
+
+        return self.__stop_deferred
 
     def install_configs(self, cfg_path):
         """Installs all files located in the configuration directory for this
@@ -320,8 +507,14 @@
         used. If no extension is given, the 's' extension will be used.
 
         Keyword Arguments:
-        blocking -- When True, do not return from this function until the CLI
-                    command finishes running.  The default is True.
+        blocking -- This used to specify that we should block until the
+        CLI command finished executing.  When the Asterisk process was turned
+        over to twisted, that's no longer the case.  The keyword argument
+        was kept merely for backwards compliance; callers should *not* expect
+        their calls to block.
+
+        Returns:
+        A deferred object that can be used to listen for command completion
 
         Example Usage:
         asterisk.originate("Local/a_exten at context extension b_exten at context")
@@ -345,17 +538,23 @@
             <tech/data> extension <exten>@<context>"
 
         if self.ast_version < AsteriskVersion("1.6.2"):
-            self.cli_exec("originate %s" % argstr, blocking=blocking)
+            return self.cli_exec("originate %s" % argstr, blocking=blocking)
         else:
-            self.cli_exec("channel originate %s" % argstr, blocking=blocking)
+            return self.cli_exec("channel originate %s" % argstr, blocking=blocking)
 
     def cli_exec(self, cli_cmd, blocking=True, warn_on_fail=True):
         """Execute a CLI command on this instance of Asterisk.
 
         Keyword Arguments:
         cli_cmd -- The command to execute.
-        blocking -- When True, do not return from this function until the CLI
-                    command finishes running.  The default is True.
+        blocking -- This used to specify that we should block until the
+        CLI command finished executing.  When the Asterisk process was turned
+        over to twisted, that's no longer the case.  The keyword argument
+        was kept merely for backwards compliance; callers should *not* expect
+        their calls to block.
+
+        Returns:
+        A deferred object that will be signaled when the process has exited
 
         Example Usage:
         asterisk.cli_exec("core set verbose 10")
@@ -370,32 +569,8 @@
         ]
         logger.debug("Executing %s ..." % cmd)
 
-        if not blocking:
-            process = subprocess.Popen(cmd)
-            return ""
-
-        try:
-            process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
-                                       stderr=subprocess.STDOUT)
-        except OSError:
-            warn("Failed to execute command: %s" % str(cmd))
-            return ""
-
-        output = ""
-        try:
-            for l in process.stdout.readlines():
-                logger.debug(l.rstrip())
-                output += l
-        except IOError:
-            pass
-        try:
-            res = process.wait()
-            if res != None and res != 0:
-                warn("Exited non-zero [%d] while executing command %s" % (res, str(cmd)))
-                output = ""
-        except OSError:
-            pass
-        return output
+        cliProtocol = AsteriskCliCommand(self.host, cmd)
+        return cliProtocol.execute()
 
     def __make_directory_structure(self):
         """ Mirror system directory structure """

Modified: asterisk/trunk/lib/python/asterisk/cdr.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/cdr.py?view=diff&rev=3132&r1=3131&r2=3132
==============================================================================
--- asterisk/trunk/lib/python/asterisk/cdr.py (original)
+++ asterisk/trunk/lib/python/asterisk/cdr.py Fri Mar 23 08:47:32 2012
@@ -15,6 +15,7 @@
 import csv
 import re
 import logging
+import time
 
 logger = logging.getLogger(__name__)
 
@@ -86,13 +87,17 @@
             return
 
         self.__records = []
+
+        cdr = None
         try:
             cdr = csv.DictReader(open(fn, "r"), AsteriskCSVCDRLine.get_fields(), ",")
-        except IOError:
-            logger.error("Failed to open CDR file '%s'" % (fn))
-            return
+        except IOError as (errno, strerror):
+            logger.debug("IOError %d[%s] while opening CDR file '%s'" % (errno, strerror, fn))
         except:
-            logger.error("Unexpected error: %s" % (sys.exc_info()[0]))
+            logger.debug("Unexpected error: %s" % (sys.exc_info()[0]))
+
+        if not cdr:
+            logger.error("Unable to open CDR file '%s'" % (fn))
             return
 
         for r in cdr:

Modified: asterisk/trunk/lib/python/asterisk/sipp.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/sipp.py?view=diff&rev=3132&r1=3131&r2=3132
==============================================================================
--- asterisk/trunk/lib/python/asterisk/sipp.py (original)
+++ asterisk/trunk/lib/python/asterisk/sipp.py Fri Mar 23 08:47:32 2012
@@ -14,13 +14,77 @@
 import os
 import subprocess
 import logging
-
-from twisted.internet import reactor
+import TestSuiteUtils
+
+from twisted.internet import reactor, defer, utils, protocol
 from asterisk import Asterisk
 from TestCase import TestCase
 
 logger = logging.getLogger(__name__)
 
+class SIPpScenarioSequence:
+    """ Execute a sequence of SIPp Scenarios in sequence.
+
+    This class manages the execution of multiple SIPpScenarios in sequence.
+    """
+
+    def __init__(self, test_case, sipp_scenarios = [], fail_on_any = False, intermediate_cb_fn = None, final_deferred = None):
+        """ Create a new sequence of scenarios
+
+        Keyword Arguments:
+        test_case - the TestCase derived object to pass to the SIPpScenario objects
+        sipp_scenarios - a list of SIPpScenario objects to execute
+        fail_on_any - if any scenario fails, stop the reactor and kill the test.
+        intermediate_cb_fn - a callback function suitable as a Deferred callback
+        that will be added to each test
+        final_deferred - a deferred object that will be called when all tests have
+        executed, but before the reactor is stopped
+        """
+        self.__sipp_scenarios = sipp_scenarios
+        self.__test_case = test_case
+        self.__fail_on_any = fail_on_any
+        self.__test_counter = 0
+        self.__intermediate_cb_fn = intermediate_cb_fn
+        self.__final_deferred = final_deferred
+
+    def register_scenario(self, sipp_scenario):
+        """ Register a new scenario with the sequence
+
+        Registers a SIPpScenario object with the sequence of scenarios to execute
+
+        Keyword Arguments:
+        sipp_scenario - the SIPpScenario object to execute
+        """
+        self.__sipp_scenarios.append(sipp_scenario)
+
+    def execute(self):
+        """ Execute the tests in sequence
+        """
+        def __execute_next(result):

[... 2467 lines stripped ...]



More information about the asterisk-commits mailing list