[asterisk-commits] mjordan: branch mjordan/twisted_process r3098 - in /asterisk/team/mjordan/twi...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Mar 16 17:29:19 CDT 2012


Author: mjordan
Date: Fri Mar 16 17:29:14 2012
New Revision: 3098

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3098
Log:
Check-in semi-working twisted management of Asterisk process

This works, for varying definitions of work.  A few tests are probably going to
shred themselves in a blaze of asynchronous glory, but you can't make an omelette
without breaking a few eggs.

After this: SIPp!  PJSUA... not sure what to do with that, since we don't have a
wrapper class.  I may just steal Terry's work item and do that in conjunction with
this work.

Modified:
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
    asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py
    asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test
    asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test
    asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test
    asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test
    asterisk/team/mjordan/twisted_process/tests/mixmonitor/run-test
    asterisk/team/mjordan/twisted_process/tests/mixmonitor_audiohook_inherit/run-test

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py Fri Mar 16 17:29:14 2012
@@ -132,12 +132,23 @@
 
     def ami_test_done(self, ami, event):
         if event.get("event") == "Hangup":
-            if self.no_active_channels():
-                try:
-                    self.stop_reactor()
-                except ReactorNotRunning:
-                    # No problemo.
-                    pass
+            self.check_active_channels()
+
+    def check_active_channels(self):
+        """
+        Check to see if we have any active channels.  If we don't kill the reactor.
+        """
+        def __parse_output(result):
+            first_line = result.output.split('\n', 1)[0]
+            first_number = first_line.split(' ', 1)[0]
+            if first_number == '0':
+                logger.debug("No channels detected; stopping reactor")
+                self.stop_reactor()
+
+        for asterisk in self.ast:
+            # 0 active channels
+            cli_deferred = asterisk.cli_exec('core show channels count')
+            cli_deferred.addCallback(__parse_output)
 
     def run(self):
         """

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py Fri Mar 16 17:29:14 2012
@@ -12,6 +12,7 @@
 import unittest
 
 from TestConditions import TestCondition
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -32,9 +33,8 @@
             self.allowed_channels = test_config.config['allowedchannels']
 
     def evaluate(self, related_test_condition = None):
-        for ast in self.ast:
-            """ For logging / debug purposes, do a full core show channels """
-            channel_lines = ast.cli_exec('core show channels')
+        def __evaluate_channel_result(cli_object):
+            channel_lines = cli_object.output
             channel_tokens = channel_lines.strip().split('\n')
             active_channels = 0
             for token in channel_tokens:
@@ -44,6 +44,18 @@
             if active_channels > self.allowed_channels:
                 super(ChannelTestCondition, self).failCheck(
                     'Detected number of active channels %d is greater than the allowed %d on Asterisk %s' % (active_channels, self.allowed_channels, ast.host))
-        """ Set to pass if we haven't detected any failures """
-        super(ChannelTestCondition, self).passCheck()
 
+        def __pass_check(result):
+            super(ChannelTestCondition, self).passCheck()
+
+        deferred_list = []
+        for ast in self.ast:
+            """ For logging / debug purposes, do a full core show channels """
+            d = ast.cli_exec('core show channels')
+            d.addCallback(__evaluate_channel_result)
+            deferred_list.append(d)
+
+        dl = defer.DeferredList(deferred_list)
+        dl.addCallback(__pass_check)
+
+        return dl

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py Fri Mar 16 17:29:14 2012
@@ -13,7 +13,7 @@
 import os
 import datetime
 import time
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from starpy import manager, fastagi
 
 from asterisk import Asterisk
@@ -59,6 +59,7 @@
         self.testStateController = None
         self.pcap = None
         self.pcapfilename = None
+        self.__stopping = False
         self.testlogdir = os.path.join(Asterisk.test_suite_root, self.base, str(os.getpid()))
         self.ast_version = AsteriskVersion()
 
@@ -199,66 +200,110 @@
         return PcapListener(device, bpf_filter, dumpfile, self.__pcap_callback)
 
     def start_asterisk(self):
-        return
+        """ This is kept mostly for legacy support.  When the TestCase previously
+        used a synchronous, blocking mechanism independent of the twisted
+        reactor to spawn Asterisk, this method was called.  Now the Asterisk
+        instances are started immediately after the reactor has started.
+
+        Derived methods can still implement this and have it be called when
+        the reactor is running, but immediately before instances of Asterisk
+        are launched.
+        """
+        pass
 
     def __start_asterisk(self):
         """
         Start the instances of Asterisk that were previously created.  See
-        create_asterisk.  Note that this should be called before the reactor is
-        told to run.
-        """
+        create_asterisk.  Note that this should be the first thing called
+        when the reactor has started to run
+        """
+        def __perform_pre_checks(result):
+            """ Execute the pre-condition checks """
+            self.testConditionController.evaluate_pre_checks()
+            return result
+
+        def __run_callback(result):
+            """ Notify the test that we are running """
+            self.run()
+
+        def __check_success_failure(result):
+            """ Make sure the instances started properly """
+            for (success, value) in result:
+                if not success:
+                    logger.error(value)
+                    self.stop_reactor()
+            return result
+
+        # Call the method that derived objects can override
+        self.start_asterisk()
+
+        start_defers = []
         for index, item in enumerate(self.ast):
             logger.info("Starting Asterisk instance %d" % (index + 1))
-            self.ast[index].start()
-        self.testConditionController.evaluate_pre_checks()
+            temp_defer = self.ast[index].start()
+            start_defers.append(temp_defer)
+
+        d = defer.DeferredList(start_defers, consumeErrors=True)
+        d.addCallback(__check_success_failure)
+        d.addCallback(__perform_pre_checks)
+        d.addCallback(__run_callback)
 
     def stop_asterisk(self):
         """
-        Stop the instances of Asterisk that were previously started.  See
-        start_asterisk.  Note that this should be called after the reactor has
-        returned from its run.
-
-        If there were errors exiting asterisk, this function will return False.
-        """
+        Called when the instances of Asterisk are being stopped.  Note that previously,
+        this explicitly stopped the Asterisk instances: now they are stopped automatically
+        when the reactor is stopped.
+        """
+        pass
+
+    def __stop_asterisk(self):
+        """ Stops the instances of Asterisk.
+        
+        Stops the instances of Asterisk - called when stop_reactor is called.  This
+        returns a deferred object that can be used to be notified when all instances
+        of Asterisk have stopped.
+         """
+
+        def __check_success_failure(result):
+            """ Make sure the instances stopped properly """
+            for (success, value) in result:
+                if not success:
+                    logger.warning(value)
+                    # This should already be called when the reactor is being terminated
+                    # if we couldn't stop the instance of Asterisk, there isn't much else to do
+                    # here other then complain
+            return result
+
         res = True
         self.testConditionController.evaluate_post_checks()
+        stop_defers = []
         for index, item in enumerate(self.ast):
             logger.info("Stopping Asterisk instance %d" % (index + 1))
-            returncode = self.ast[index].stop()
-            if returncode < 0:
-                # XXX setting passed here might be overridden later in a
-                # derived class. This is bad.
-                self.passed = False
-                logger.error("Asterisk instance %d exited with signal %d" % (index + 1, abs(returncode)))
-                res = False
-            elif returncode > 0:
-                # XXX same here
-                self.passed = False
-                logger.error("Asterisk instance %d exited with non-zero return code %d" % (index + 1, returncode))
-                res = False
-
-        return res
-
-    def no_active_channels(self):
-        """
-        Return true if all our asterisk children have 0 active channels.
-        """
-        for asterisk in self.ast:
-            # 0 active channels
-            first_line = asterisk.cli_exec('core show channels count').split('\n', 1)[0]
-            # 0
-            first_number = first_line.split(' ', 1)[0]
-            if first_number != '0':
-                return False
-        return True
+            temp_defer = self.ast[index].stop()
+            stop_defers.append(temp_defer)
+
+        d = defer.DeferredList(stop_defers, consumeErrors=True)
+        d.addCallback(__check_success_failure)
+        return d
 
     def stop_reactor(self):
         """
         Stop the reactor and cancel the test.
         """
-        logger.info("Stopping Reactor")
-        if reactor.running:
-            reactor.stop()
+        def __stop_reactor(result):
+            """ Called when the Asterisk instances are stopped """
+            logger.info("Stopping Reactor")
+            if reactor.running:
+                try:
+                    reactor.stop()
+                except twisted.internet.error.ReactorNotRunning:
+                    # Meh.
+                    pass
+        if not self.__stopping:
+            logger.info("Stopping Asterisk instances")
+            df = self.__stop_asterisk()
+            df.addCallback(__stop_reactor)
+            self.__stopping = True
 
     def __reactor_timeout(self):
         """
@@ -269,10 +314,16 @@
         self.stop_reactor()
 
     def __run(self):
-        logger.debug("Reactor is running: %s" % str(reactor.running))
+        """
+        Private entry point called when the reactor first starts up.
+        This needs to first ensure that Asterisk is fully up and running before
+        moving on.
+        """
         if (self.ast):
             self.__start_asterisk()
-        self.run()
+        else:
+            # If no instances of Asterisk are needed, go ahead and just run
+            self.run()
 
     def run(self):
         """
@@ -314,7 +365,6 @@
         pass
 
     def __pcap_callback(self, packet):
-        logger.debug("Received packet: %s\n" % (packet,))
         self.pcap_callback(packet)
 
     def handleOriginateFailure(self, reason):

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py Fri Mar 16 17:29:14 2012
@@ -22,14 +22,14 @@
 from config import ConfigFile
 from version import AsteriskVersion
 
-from twisted.internet import reactor, protocol, threads
+from twisted.internet import reactor, protocol, threads, defer
 from twisted.internet import utils
 
 logger = logging.getLogger(__name__)
 
 class AsteriskCliCommand():
     """
-    Manages an Asterisk CLI command.  This is a blocking operation
+    Class that manages an Asterisk CLI command.
     """
 
     def __init__(self, host, cmd):
@@ -46,59 +46,82 @@
         self.err = ""
 
     def execute(self):
-        df = threads.deferToThread(self.__execute)
-        df.addErrback(self.__thread_err)
-
-    def __execute(self):
-        logger.debug("Spawning command")
+        """
+        Execute the CLI command.
+
+        Returns a deferred that will be called when the operation completes.  The
+        parameter to the deferred is this object.
+        """
+        def __cli_output_callback(result):
+            """ Handle regular output from the Asterisk CLI instance """
+            self.__set_properties(result)
+            logger.debug("Asterisk CLI %s exited %d" % (self.__host, self.exitcode))
+            if self.exitcode:
+                self.__deferred.errback(self)
+            else:
+                self.__deferred.callback(self)
+    
+        def __cli_error_callback(result):
+            """ Handle an error coming back from the Asterisk CLI instance """
+            self.__set_properties(result)
+            logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
+            self.__deferred.errback(self)
+
+        self.__deferred = defer.Deferred()
         df = utils.getProcessOutputAndValue(self.__cmd[0], self.__cmd)
-        df.addCallbacks(self.__cli_output_callback, self.__cli_error_callback)
-
-    def __cli_output_callback(self, result):
-        #self.transport.loseConnection()
-        logger.debug("Asterisk CLI %s exited %s" % (self.__host, str(result)))
-        self.__set_properties(result)
-
-    def __cli_error_callback(self, result):
-        #self.transport.loseConnection()
-        self.__set_properties(result)
-        logger.debug("Asterisk CLI %s exited %s" % (self.__host, str(result)))
-        logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
-
-    def __thread_err(self, result):
-        logger.warning("Asterisk CLI %s failed to spawn properly: %s" % (self.__host, str(result)))
+        df.addCallback(__cli_output_callback)
+        df.addErrback(__cli_error_callback)
+
+        return self.__deferred
 
     def __set_properties(self, result):
+        """ Set the properties based on the result of the getProcessOutputAndValue call """
         out, err, code = result
         self.exitcode = code
         self.output = out
         self.err = err
 
 class AsteriskProtocol(protocol.ProcessProtocol):
-    def __init__(self, host):
+    """
+    Class that manages an Asterisk instance
+    """
+
+    def __init__(self, host, stop_deferred):
+        """ Create an AsteriskProtocol object
+
+        Create an AsteriskProtocol object, which manages the interactions with
+        the Asterisk process
+
+        Keyword Arguments:
+        host - the hostname or address of the Asterisk instance
+        stop_deferred - a twisted Deferred object that will be called when the
+        process has exited
+        """
+
         self.output = ""
         self.__host = host
         self.exitcode = 0
         self.exited = False
+        self.__stop_deferred = stop_deferred
 
     def outReceived(self, data):
+        """ Override of ProcessProtocol.outReceived """
         logger.debug("Asterisk %s received: %s" % (self.__host, data))
         self.output += data
 
     def connectionMade(self):
+        """ Override of ProcessProtocol.connectionMade """
         logger.debug("Asterisk %s - connection made" % (self.__host))
 
     def errReceived(self, data):
+        """ Override of ProcessProtocol.errReceived """
         logger.warn("Asterisk %s received error: %s" % (self.__host, data))
 
-    def processExited(self, reason):
-        logger.debug("Asterisk %s exited with code %d" % (self.__host, reason.value.exitCode,))
-        self.exitcode = reason.value.exitCode
-        self.exitcode = True
-
     def processEnded(self, reason):
+        """ Override of ProcessProtocol.processEnded """
         logger.debug("Asterisk %s ended with code %d" % (self.__host, reason.value.exitCode,))
         self.exitcode = reason.value.exitCode
+        self.__stop_deferred.callback("Asterisk %s ended with code %d" % (self.__host, reason.value.exitCode,))
         self.exited = True
 
 class Asterisk:
@@ -181,9 +204,11 @@
                     self.directories[var] = val
 
     def start(self):
-        """Start this instance of Asterisk.
-
-        This function starts up this instance of Asterisk.
+        """ Start this instance of Asterisk.
+
+        Returns:
+        A deferred object that is called when Asterisk is fully booted, or an
+        error if Asterisk fails to start
 
         Example Usage:
         asterisk.start()
@@ -191,6 +216,20 @@
         Note that calling this will install the default testsuite
         config files, if they have not already been installed
         """
+
+        def __wait_fully_booted_callback(cli_command):
+            self.__start_deferred.callback("Successfully started Asterisk %s" % self.host)
+    
+        def __wait_fully_booted_error(cli_command):
+            if time.time() - self.__start_asterisk_time > 5:
+                logger.error("Asterisk core waitfullybooted for %s failed" % self.host)
+                self.__start_deferred.errback("Command core waitfullybooted failed")
+            else:
+                logger.debug("Asterisk core waitfullybooted failed, attempting again...")
+                cli_deferred = self.cli_exec("core waitfullybooted")
+                cli_deferred.addCallback(__wait_fully_booted_callback)
+                cli_deferred.addErrback(__wait_fully_booted_error)
+
         self.install_configs(os.getcwd() + "/configs")
         self.__setup_configs()
 
@@ -200,75 +239,120 @@
             "-C", "%s" % os.path.join(self.astetcdir, "asterisk.conf")
         ]
 
-        self.processProtocol = AsteriskProtocol(self.host)
+        self.__start_deferred = defer.Deferred()
+        self.__stop_deferred = defer.Deferred()
+        self.processProtocol = AsteriskProtocol(self.host, self.__stop_deferred)
         self.process = reactor.spawnProcess(self.processProtocol, cmd[0], cmd)
 
-        # Be _really_ sure that Asterisk has started up before returning.
-
-        # Poll the instance to make sure we created it successfully
-        if (self.processProtocol.exited):
-            """ Rut roh, Asterisk process exited prematurely """
-            logger.error("Asterisk instance %s exited prematurely with return code %d" % (self.host, self.processProtocol.exitcode))
-
-        start = time.time()
-        for i in xrange(5):
-            time.sleep(1.0)
-            # This command should stall until completed, but if an
-            # exception occurs, it returns the empty string.
-            if self.cli_exec("core waitfullybooted", warn_on_fail=False):
-                break;
-
+        # Begin the wait fully booted cycle
+        self.__start_asterisk_time = time.time()
+        cli_deferred = self.cli_exec("core waitfullybooted")
+        cli_deferred.addCallback(__wait_fully_booted_callback)
+        cli_deferred.addErrback(__wait_fully_booted_error)
+        return self.__start_deferred
 
     def stop(self):
         """Stop this instance of Asterisk.
 
         This function is used to stop this instance of Asterisk.
 
+        Returns a deferred that can be used to detect when Asterisk exits,
+        or if it fails to exit
+
         Example Usage:
         asterisk.stop()
         """
+
+        def __send_stop_gracefully():
+            if self.ast_version < AsteriskVersion("1.6.0"):
+                cli_deferred = self.cli_exec("stop gracefully")
+            else:
+                cli_deferred = self.cli_exec("core stop gracefully")
+            return cli_deferred
+
+        def __send_stop_now():
+            if self.ast_version < AsteriskVersion("1.6.0"):
+                cli_deferred = self.cli_exec("stop now")
+            else:
+                cli_deferred = self.cli_exec("core stop now")
+            return cli_deferred
+
+        def __stop_gracefully_callback(cli_command):
+            logger.debug("Successfully stopped Asterisk %s" % self.host)
+
+        def __stop_gracefully_error(cli_command):
+            if time.time() - self.__stop_time > 5:
+                logger.warning("Asterisk graceful stop for %s failed, attempting stop now" % self.host)
+                self.__stop_time = time.time()
+                cli_deferred = __send_stop_now()
+                cli_deferred.addCallback(__stop_now_callback)
+                cli_deferred.addErrback(__stop_now_error)
+            else:
+                logger.debug("Asterisk graceful stop failed, attempting again...")
+                cli_deferred = __send_stop_gracefully()
+                cli_deferred.addCallback(__stop_gracefully_callback)
+                cli_deferred.addErrback(__stop_gracefully_error)
+
+        def __stop_now_callback(cli_command):
+            logger.debug("Successfully stopped Asterisk %s" % self.host)
+
+        def __stop_now_error(cli_command):
+            if time.time() - self.__stop_time > 5:
+                logger.warning("Asterisk  stop now for %s failed, attempting TERM" % self.host)
+                # If even a "stop now" didn't do the trick, fall back to sending
+                # signals to the process.  First, send a SIGTERM.  If it _STILL_ hasn't
+                # gone away after another 5 seconds, send SIGKILL.
+                try:
+                    self.process.signalProcess("TERM")
+                    if not self.processProtocol.exited:
+                        reactor.callLater(5, __check_process_and_kill)
+                except twisted.internet.error.ProcessExitedAlready:
+                    # Probably that we sent a signal to a process that was already
+                    # dead.  Just ignore it.
+                    pass
+                logger.debug("Successfully terminated Asterisk %s" % self.host)
+            else:
+                logger.debug("Asterisk stop now failed, attempting again...")
+                cli_deferred = __send_stop_now()
+                cli_deferred.addCallback(__stop_now_callback)
+                cli_deferred.addErrback(__stop_now_error)
+
+        def __check_process_and_kill():
+            try:
+                if not self.processProtocol.exited:
+                    logger.info("Sending KILL to Asterisk %s" % self.host)
+                    self.process.signalProcess("KILL")
+            except twisted.internet.error.ProcessExitedAlready:
+                # Pass on this
+                pass
+            # If you kill the process, the ProcessProtocol will never get the note that its dead
+            try:
+                self.process.loseConnection()
+            except:
+                pass
+            self.__stop_deferred.callback("Asterisk %s KILLED" % self.host)
+
+        def __cancel_kill(reason):
+            try:
+                self.__cancel_token.cancel()
+            except ValueError:
+                pass
+            return reason
+
         # Start by asking to stop gracefully.
-        if self.ast_version < AsteriskVersion("1.6.0"):
-            self.cli_exec("stop gracefully")
-        else:
-            self.cli_exec("core stop gracefully")
-        for i in xrange(5):
-            time.sleep(1.0)
-            if self.processProtocol.exited:
-                return self.process.returncode
-
-        # If the graceful shutdown did not complete within 5 seconds, ask
-        # Asterisk to stop right now.
-        if self.ast_version < AsteriskVersion("1.6.0"):
-            self.cli_exec("stop now")
-        else:
-            self.cli_exec("core stop now")
-        for i in xrange(5):
-            time.sleep(1.0)
-            if self.processProtocol.exited:
-                return self.processProtocol.exitcode
-
-        # If even a "stop now" didn't do the trick, fall back to sending
-        # signals to the process.  First, send a SIGTERM.  If it _STILL_ hasn't
-        # gone away after another 5 seconds, send SIGKILL.
-        try:
-            self.process.signalProcess("TERM")
-            for i in xrange(5):
-                time.sleep(1.0)
-                if self.processProtocol.exited:
-                    return self.processProtocol.exitcode
-            self.process.signalProcess("KILL")
-        except twisted.internet.error.ProcessExitedAlready:
-            # Probably that we sent a signal to a process that was already
-            # dead.  Just ignore it.
-            pass
-
-        # If we get to this point and Asterisk still hasn't exited, there isn't much
-        # to do other then flag it
-        if not self.processProtocol.exited:
-            logger.error("Asterisk instance %s failed to exit" % self.host)
-
-        return self.processProtocol.exitcode
+        self.__stop_time = time.time()
+        d = __send_stop_gracefully()
+        d.addCallback(__stop_gracefully_callback)
+        d.addErrback(__stop_gracefully_error)
+
+        # There's a chance that we'll get back a 'yes, I'll stop' from Asterisk, and then it
+        # just won't.  Add a 10 second callback to the reactor to terminate the Asterisk instance
+        # with prejudice if it lies to us.  That should be more then enough time for one of the
+        # previous methods to stop it in a nicer fashion.
+        self.__cancel_token = reactor.callLater(10, __check_process_and_kill)
+        self.__stop_deferred.addCallback(__cancel_kill)
+
+        return self.__stop_deferred
 
     def install_configs(self, cfg_path):
         """Installs all files located in the configuration directory for this
@@ -389,6 +473,9 @@
         Keyword Arguments:
         blocking -- When True, do not return from this function until the CLI
                     command finishes running.  The default is True.
+
+        Returns:
+        A deferred object that can be used to listen for command completion
 
         Example Usage:
         asterisk.originate("Local/a_exten at context extension b_exten at context")
@@ -412,17 +499,20 @@
             <tech/data> extension <exten>@<context>"
 
         if self.ast_version < AsteriskVersion("1.6.2"):
-            self.cli_exec("originate %s" % argstr, blocking=blocking)
+            return self.cli_exec("originate %s" % argstr, blocking=blocking)
         else:
-            self.cli_exec("channel originate %s" % argstr, blocking=blocking)
+            return self.cli_exec("channel originate %s" % argstr, blocking=blocking)
 
     def cli_exec(self, cli_cmd, blocking=True, warn_on_fail=True):
         """Execute a CLI command on this instance of Asterisk.
 
         Keyword Arguments:
         cli_cmd -- The command to execute.
-        blocking -- When True, do not return from this function until the CLI
-                    command finishes running.  The default is True.
+        blocking -- When True, the process is spawned as a blocking
+        operation, at least as well as we can with twisted.
+
+        Returns:
+        A deferred object that will be signaled when the process has exited
 
         Example Usage:
         asterisk.cli_exec("core set verbose 10")
@@ -438,18 +528,13 @@
         logger.debug("Executing %s ..." % cmd)
 
         if not blocking:
-            cliProtocol = AsteriskProtocol(("CLI:%s" % self.host))
+            stop_deferred = defer.Deferred()
+            cliProtocol = AsteriskProtocol(("CLI:%s" % self.host), stop_deferred)
             process = reactor.spawnProcess(cliProtocol, cmd[0], cmd)
-            return ""
+            return stop_deferred
 
         cliProtocol = AsteriskCliCommand(self.host, cmd)
-        cliProtocol.execute()
-        for i in range(0, 5):
-            if cliProtocol.exitcode >= 0:
-                return cliProtocol.output
-            else:
-                time.sleep(1.0)
-                logger.debug("Checking again")
+        return cliProtocol.execute()
 
     def __make_directory_structure(self):
         """ Mirror system directory structure """

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py Fri Mar 16 17:29:14 2012
@@ -15,6 +15,7 @@
 import csv
 import re
 import logging
+import time
 
 logger = logging.getLogger(__name__)
 
@@ -86,13 +87,23 @@
             return
 
         self.__records = []
-        try:
-            cdr = csv.DictReader(open(fn, "r"), AsteriskCSVCDRLine.get_fields(), ",")
-        except IOError:
-            logger.error("Failed to open CDR file '%s'" % (fn))
-            return
-        except:
-            logger.error("Unexpected error: %s" % (sys.exc_info()[0]))
+
+        cdr = None
+        for i in range(0, 5):
+            # So... sometimes the CDR tests can exit really, really fast.  If that happens,
+            # the CSV can sometimes not exist yet - attempt opening it a few times, and only
+            # error out if we fail a few times
+            try:
+                cdr = csv.DictReader(open(fn, "r"), AsteriskCSVCDRLine.get_fields(), ",")
+                break
+            except IOError as (errno, strerror):
+                logger.debug("IOError %d[%s] while opening CDR file '%s'" % (errno, strerror, fn))
+            except:
+                logger.debug("Unexpected error: %s" % (sys.exc_info()[0]))
+            time.sleep(1.0)
+
+        if not cdr:
+            logger.error("Unable to open CDR file '%s'" % (fn))
             return
 
         for r in cdr:

Modified: asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test Fri Mar 16 17:29:14 2012
@@ -62,9 +62,9 @@
             self.voicemailManager.createDummyVoicemail("default", "1234", self.voicemailManager.oldFolderName, i, self.formats)
 
         ami.registerEvent('UserEvent', self.user_event)
-        self.ast[ami.id].cli_exec("dialplan set global EXPECTED_NEW 15")
-        self.ast[ami.id].cli_exec("dialplan set global EXPECTED_URGENT 5")
-        self.ast[ami.id].cli_exec("dialplan set global EXPECTED_OLD 15")
+        ami.setVar(channel = "", variable = "EXPECTED_NEW", value = "15")
+        ami.setVar(channel = "", variable = "EXPECTED_URGENT", value = "5")
+        ami.setVar(channel = "", variable = "EXPECTED_OLD", value = "15")
 
         df1 = ami.originate("Local/s at listener", "voicemail", "1234", 1)
         df1.addErrback(self.handleOriginateFailure)

Modified: asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test Fri Mar 16 17:29:14 2012
@@ -83,6 +83,9 @@
         self.ami = ami
         self.ami.registerEvent('ChanSpyStart', self.chanspyEvent)
 
+        self.ami.setVar(channel = "", variable = "TESTAUDIO1", value = self.audiofile1)
+        self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
+
     def amiLoginError(self, ami):
         self.logLastStep("AMI login failed")
         reactor.callLater(1, self.readResult)
@@ -115,9 +118,6 @@
         playfilearg = "--play-file=%s.wav" % (self.talkingaudio)
         self.pja = subprocess.Popen(['pjsua', '--local-port=5065', '--auto-answer=200', '--null-audio', '--auto-loop'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
         self.pjb = subprocess.Popen(['pjsua', '--local-port=5066', '--auto-answer=200', playfilearg, '--null-audio', '--auto-play'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
-        self.ast1.cli_exec("core set global TESTAUDIO1 " + self.audiofile1)
-        self.ast1.cli_exec("core set global TALK_AUDIO " + self.talkingaudio)
 
     def stopProcesses(self):
         self.logLastStep("Stopping Processes")

Modified: asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test Fri Mar 16 17:29:14 2012
@@ -79,6 +79,8 @@
         self.logLastStep("Connected to the AMI")
         self.ami = ami
         self.ami.registerEvent('ChanSpyStart', self.chanspyEvent)
+        self.ami.setVar(channel = "", variable = "TESTAUDIO1", value = self.audiofile1)
+        self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
 
     def amiLoginError(self, ami):
         self.logLastStep("AMI login failed")
@@ -119,9 +121,6 @@
         self.pja = subprocess.Popen(['pjsua', '--local-port=5065', '--auto-answer=200', '--null-audio'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
         self.pjb = subprocess.Popen(['pjsua', '--local-port=5066', '--auto-answer=200', '--null-audio'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
         self.pjc = subprocess.Popen(['pjsua', '--local-port=5067', '--auto-answer=200', '--null-audio'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
-        self.ast1.cli_exec("core set global TESTAUDIO1 " + self.audiofile1)
-        self.ast1.cli_exec("core set global TALK_AUDIO " + self.talkingaudio)
 
     def stopProcesses(self):
         self.logLastStep("Stopping Processes")

Modified: asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test Fri Mar 16 17:29:14 2012
@@ -9,144 +9,136 @@
 
 import sys
 import os
+import logging
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 
 sys.path.append("lib/python")
 from asterisk.asterisk import Asterisk
 from asterisk.TestCase import TestCase
 
+logger = logging.getLogger(__name__)
 
 class moduleLoadUnloadTest(TestCase):
     def __init__(self):
         TestCase.__init__(self)
         self.create_asterisk()
 
+    def check_file(self, module):
+        module[1] = (os.path.isfile(self.ast[0].base + "/usr/lib/asterisk/modules/" + module[0]))
+        return module[1]
+
+    def pre_load_next(self):
+        self.__preload_test_counter += 1
+        if (self.__preload_test_counter < len(self.preload_res)):
+            self.pre_load_module(self.preload_res[self.__preload_test_counter])
+        else:
+            self.load_next()
+
+    def pre_load_module(self, module):
+        def __module_callback(reason):
+            text = "Loaded " + self.__module[0]
+            if reason.output.count(text) > 0:
+                self.__module[2] = True
+            self.pre_load_next()
+
+        if not self.check_file(module):
+            logger.info(module[0] + " does not exist! Skipping this test.")
+            self.pre_load_next()
+
+        df = self.ast[0].cli_exec("module load " + module[0])
+        df.addCallback(__module_callback)
+        self.__module = module
+
+    def load_next(self):
+        self.__load_test_counter += 1
+        if (self.__load_test_counter < len(self.res)):
+            self.load_module(self.res[self.__load_test_counter])
+        else:
+            self.unload_next()
+
     def load_module(self, module):
+        def __module_callback(reason):
+            text = "Loaded " + self.__module[0]
+            if reason.output.count(text) > 0:
+                self.__module[2] = True
+            self.load_next()
+
         if not self.check_file(module):
-            print module[0] + " does not exist! Skipping test."
-            return False
+            logger.info(module[0] + " does not exist! Skipping test.")
+            self.load_next()
 
-        text = "Loaded " + module[0]
-        res = self.ast[0].cli_exec("module load " + module[0])
-        if res.count(text) > 0:
-            module[2] = True
-        return module[2]
+        df = self.ast[0].cli_exec("module load " + module[0])
+        df.addCallback(__module_callback)
+        self.__module = module
+
+    def unload_next(self):
+        self.__unload_test_counter += 1
+        if (self.__unload_test_counter < len(self.unloads)):
+            self.unload_module(self.unloads[self.__unload_test_counter])
+        else:
+            if not self.__checking_results:
+                self.__checking_results = True
+                self.check_results()
 
     def unload_module(self, module):
+        def __module_callback(reason):
+            test = "Unloaded " + self.__module[0]
+            if reason.output.count(text) > 0:
+                self.__module[3] = True
+            self.unload_next()
+

[... 199 lines stripped ...]



More information about the asterisk-commits mailing list