[asterisk-commits] mjordan: branch mjordan/twisted_process r3092 - in /asterisk/team/mjordan/twi...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 14 10:54:51 CDT 2012

Author: mjordan
Date: Wed Mar 14 10:54:47 2012
New Revision: 3092

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3092
Initial migration of python subprocess to twisted

And its going so well too.

    asterisk/team/mjordan/twisted_process/   (props changed)
      - copied from r3077, asterisk/trunk/
      - copied unchanged from r3077, asterisk/trunk/lib/python/asterisk/utils.py

Propchange: asterisk/team/mjordan/twisted_process/
    automerge = *

Propchange: asterisk/team/mjordan/twisted_process/
    reviewboard:url = https://reviewboard.asterisk.org

Propchange: asterisk/team/mjordan/twisted_process/
--- svn:ignore (added)
+++ svn:ignore Wed Mar 14 10:54:47 2012
@@ -1,0 +1,2 @@

Propchange: asterisk/team/mjordan/twisted_process/
--- svn:mergeinfo (added)
+++ svn:mergeinfo Wed Mar 14 10:54:47 2012
@@ -1,0 +1,2 @@

Propchange: asterisk/team/mjordan/twisted_process/
    svnmerge-integrated = /asterisk/trunk:1-2945

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py?view=diff&rev=3092&r1=3077&r2=3092
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py Wed Mar 14 10:54:47 2012
@@ -84,7 +84,7 @@
         logger.info("Executing " + self.test_name)
-        reactor.callWhenRunning(self.run)
+        reactor.callWhenRunning(self.__run)
     def __setup_conditions(self):
@@ -199,6 +199,9 @@
         return PcapListener(device, bpf_filter, dumpfile, self.__pcap_callback)
     def start_asterisk(self):
+        return
+    def __start_asterisk(self):
         Start the instances of Asterisk that were previously created.  See
         create_asterisk.  Note that this should be called before the reactor is
@@ -265,6 +268,12 @@
         logger.warning("Reactor timeout: '%s' seconds" % self.reactor_timeout)
+    def __run(self):
+        logger.debug("Reactor is running: %s" % str(reactor.running))
+        if (self.ast):
+            self.__start_asterisk()
+        self.run()
     def run(self):
         Base implementation of the test execution method, run.  Derived classes

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestConfig.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestConfig.py?view=diff&rev=3092&r1=3077&r2=3092
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestConfig.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestConfig.py Wed Mar 14 10:54:47 2012
@@ -12,14 +12,13 @@
 import sys
 import os
 import subprocess
-import optparse
-import time
 import yaml
 import socket
-import utils
+import TestSuiteUtils
 from version import AsteriskVersion
 from asterisk import Asterisk
 from buildoptions import AsteriskBuildOptions
@@ -107,7 +106,7 @@
         self.met = False
         if "app" in dep:
             self.name = dep["app"]
-            self.met = utils.which(self.name) is not None
+            self.met = TestSuiteUtils.which(self.name) is not None
         elif "python" in dep:
             self.name = dep["python"]
@@ -184,7 +183,7 @@
         we run pjsua --help and parse the output to determine if --ipv6
         is a valid option
-        if utils.which('pjsua') is None:
+        if TestSuiteUtils.which('pjsua') is None:
             return False
         help_output = subprocess.Popen(['pjsua', '--help'],

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py?view=diff&rev=3092&r1=3077&r2=3092
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py Wed Mar 14 10:54:47 2012
@@ -13,17 +13,93 @@
 import sys
 import os
-import signal
 import time
 import shutil
-import subprocess
-import utils
 import logging
+import TestSuiteUtils
 from config import ConfigFile
 from version import AsteriskVersion
+from twisted.internet import reactor, protocol, threads
+from twisted.internet import utils
 logger = logging.getLogger(__name__)
+class AsteriskCliCommand():
+    """
+    Manages an Asterisk CLI command.  This is a blocking operation
+    """
+    def __init__(self, host, cmd):
+        """
+        Create a new Asterisk CLI Protocol instance
+        host    The host this CLI instance will connect to
+        cmd     The command to spawn (Asterisk instance, with command line parameters)
+        """
+        self.__host = host
+        self.__cmd = cmd
+        self.exitcode = -1
+        self.output = ""
+        self.err = ""
+    def execute(self):
+        df = threads.deferToThread(self.__execute)
+        df.addErrback(self.__thread_err)
+    def __execute(self):
+        logger.debug("Spawning command")
+        df = utils.getProcessOutputAndValue(self.__cmd[0], self.__cmd)
+        df.addCallbacks(self.__cli_output_callback, self.__cli_error_callback)
+    def __cli_output_callback(self, result):
+        #self.transport.loseConnection()
+        logger.debug("Asterisk CLI %s exited %s" % (self.__host, str(result)))
+        self.__set_properties(result)
+    def __cli_error_callback(self, result):
+        #self.transport.loseConnection()
+        self.__set_properties(result)
+        logger.debug("Asterisk CLI %s exited %s" % (self.__host, str(result)))
+        logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
+    def __thread_err(self, result):
+        logger.warning("Asterisk CLI %s failed to spawn properly: %s" % (self.__host, str(result)))
+    def __set_properties(self, result):
+        out, err, code = result
+        self.exitcode = code
+        self.output = out
+        self.err = err
+class AsteriskProtocol(protocol.ProcessProtocol):
+    def __init__(self, host):
+        self.output = ""
+        self.__host = host
+        self.exitcode = 0
+        self.exited = False
+    def outReceived(self, data):
+        logger.debug("Asterisk %s received: %s" % (self.__host, data))
+        self.output += data
+    def connectionMade(self):
+        logger.debug("Asterisk %s - connection made" % (self.__host))
+    def errReceived(self, data):
+        logger.warn("Asterisk %s received error: %s" % (self.__host, data))
+    def processExited(self, reason):
+        logger.debug("Asterisk %s exited with code %d" % (self.__host, reason.value.exitCode,))
+        self.exitcode = reason.value.exitCode
+        self.exitcode = True
+    def processEnded(self, reason):
+        logger.debug("Asterisk %s ended with code %d" % (self.__host, reason.value.exitCode,))
+        self.exitcode = reason.value.exitCode
+        self.exited = True
 class Asterisk:
     """An instance of Asterisk.
@@ -68,7 +144,7 @@
         if base is not None:
             self.base = "%s/%s" % (self.base, base)
         self.astetcdir = Asterisk.asterisk_etc_directory
-        self.ast_binary = utils.which("asterisk") or "/usr/sbin/asterisk"
+        self.ast_binary = TestSuiteUtils.which("asterisk") or "/usr/sbin/asterisk"
         self.host = host
         self.__ast_conf_options = ast_conf_options
@@ -123,32 +199,25 @@
             "-f", "-g", "-q", "-m", "-n",
             "-C", "%s" % os.path.join(self.astetcdir, "asterisk.conf")
-        try:
-            self.process = subprocess.Popen(cmd)
-        except OSError:
-            logger.error("Failed to execute command: %s" % str(cmd))
-            return False
+        self.processProtocol = AsteriskProtocol(self.host)
+        self.process = reactor.spawnProcess(self.processProtocol, cmd[0], cmd)
         # Be _really_ sure that Asterisk has started up before returning.
         # Poll the instance to make sure we created it successfully
-        self.process.poll()
-        if self.process.returncode != None:
+        if (self.processProtocol.exited):
             """ Rut roh, Asterisk process exited prematurely """
-            logger.error("Asterisk instance %s exited prematurely with return code %d" % (self.host, self.process.returncode))
+            logger.error("Asterisk instance %s exited prematurely with return code %d" % (self.host, self.processProtocol.exitcode))
         start = time.time()
-        while True:
+        for i in xrange(5):
+            time.sleep(1.0)
             # This command should stall until completed, but if an
             # exception occurs, it returns the empty string.
-            if not self.cli_exec("core waitfullybooted", warn_on_fail=False):
-                if time.time() - start > 5:
-                    logger.error("Unknown state of asterisk. Stopping waitfullybooted...")
-                    break
-                logger.debug("Attempting waitfullybooted again...")
-            else:
-                # We're fully booted...
-                break
+            if self.cli_exec("core waitfullybooted", warn_on_fail=False):
+                break;
     def stop(self):
         """Stop this instance of Asterisk.
@@ -165,11 +234,8 @@
             self.cli_exec("core stop gracefully")
         for i in xrange(5):
-            if self.process.poll() is not None:
+            if self.processProtocol.exited:
                 return self.process.returncode
-        # Check for locks
-        self.cli_exec("core show locks")
         # If the graceful shutdown did not complete within 5 seconds, ask
         # Asterisk to stop right now.
@@ -179,29 +245,30 @@
             self.cli_exec("core stop now")
         for i in xrange(5):
-            if self.process.poll() is not None:
-                return self.process.returncode
+            if self.processProtocol.exited:
+                return self.processProtocol.exitcode
         # If even a "stop now" didn't do the trick, fall back to sending
         # signals to the process.  First, send a SIGTERM.  If it _STILL_ hasn't
         # gone away after another 5 seconds, send SIGKILL.
-            os.kill(self.process.pid, signal.SIGTERM)
+            self.process.signalProcess("TERM")
             for i in xrange(5):
-                if self.process.poll() is not None:
-                    return self.process.returncode
-            os.kill(self.process.pid, signal.SIGKILL)
-        except OSError:
+                if self.processProtocol.exited:
+                    return self.processProtocol.exitcode
+            self.process.signalProcess("KILL")
+        except twisted.internet.error.ProcessExitedAlready:
             # Probably that we sent a signal to a process that was already
             # dead.  Just ignore it.
-        # We have done everything we can do at this point.  Wait for the
-        # process to exit.
-        self.process.wait()
-        return self.process.returncode
+        # If we get to this point and Asterisk still hasn't exited, there isn't much
+        # to do other then flag it
+        if not self.processProtocol.exited:
+            logger.error("Asterisk instance %s failed to exit" % self.host)
+        return self.processProtocol.exitcode
     def install_configs(self, cfg_path):
         """Installs all files located in the configuration directory for this
@@ -371,31 +438,18 @@
         logger.debug("Executing %s ..." % cmd)
         if not blocking:
-            process = subprocess.Popen(cmd)
+            cliProtocol = AsteriskProtocol(("CLI:%s" % self.host))
+            process = reactor.spawnProcess(cliProtocol, cmd[0], cmd)
             return ""
-        try:
-            process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
-                                       stderr=subprocess.STDOUT)
-        except OSError:
-            warn("Failed to execute command: %s" % str(cmd))
-            return ""
-        output = ""
-        try:
-            for l in process.stdout.readlines():
-                logger.debug(l.rstrip())
-                output += l
-        except IOError:
-            pass
-        try:
-            res = process.wait()
-            if res != None and res != 0:
-                warn("Exited non-zero [%d] while executing command %s" % (res, str(cmd)))
-                output = ""
-        except OSError:
-            pass
-        return output
+        cliProtocol = AsteriskCliCommand(self.host, cmd)
+        cliProtocol.execute()
+        for i in range(0, 5):
+            if cliProtocol.exitcode >= 0:
+                return cliProtocol.output
+            else:
+                time.sleep(1.0)
+                logger.debug("Checking again")
     def __make_directory_structure(self):
         """ Mirror system directory structure """

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/sippversion.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/sippversion.py?view=diff&rev=3092&r1=3077&r2=3092
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/sippversion.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/sippversion.py Wed Mar 14 10:54:47 2012
@@ -8,13 +8,12 @@
 the GNU General Public License Version 2.
-import re
 import subprocess
 import sys
 import unittest
-import utils
+import TestSuiteUtils
 class SIPpVersion:
     """A SIPp Version.
@@ -35,7 +34,7 @@
         self.pcap = False
         if version is None and feature is None:
-            sipp = utils.which("sipp")
+            sipp = TestSuiteUtils.which("sipp")
             if sipp is None:

Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/version.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/version.py?view=diff&rev=3092&r1=3077&r2=3092
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/version.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/version.py Wed Mar 14 10:54:47 2012
@@ -11,12 +11,12 @@
 the GNU General Public License Version 2.
-import sys
 import re
 import unittest
 import logging
 import subprocess
-import utils
+import TestSuiteUtils
 logger = logging.getLogger(__name__)
@@ -137,7 +137,7 @@
         if not hasattr(cls, "_asterisk_version_from_binary"):
             version = ""
-            ast_binary = utils.which("asterisk") or "/usr/sbin/asterisk"
+            ast_binary = TestSuiteUtils.which("asterisk") or "/usr/sbin/asterisk"
             cmd = [

Modified: asterisk/team/mjordan/twisted_process/runtests.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/runtests.py?view=diff&rev=3092&r1=3077&r2=3092
--- asterisk/team/mjordan/twisted_process/runtests.py (original)
+++ asterisk/team/mjordan/twisted_process/runtests.py Wed Mar 14 10:54:47 2012
@@ -24,7 +24,7 @@
 from asterisk.version import AsteriskVersion
 from asterisk.asterisk import Asterisk
 from asterisk.TestConfig import Dependency, TestConfig
-from asterisk import utils
+from asterisk import TestSuiteUtils
 TESTS_CONFIG = "tests.yaml"
 TEST_RESULTS = "asterisk-test-suite-report.xml"

More information about the asterisk-commits mailing list