[asterisk-commits] mjordan: branch mjordan/twisted_process r3098 - in /asterisk/team/mjordan/twi...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Mar 16 17:29:19 CDT 2012
Author: mjordan
Date: Fri Mar 16 17:29:14 2012
New Revision: 3098
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3098
Log:
Check-in semi-working twisted management of Asterisk process
This works, for varying definitions of work. A few tests are probably going to
shred themselves in a blaze of asynchronous glory, but you can't make an omelette
without breaking a few eggs.
After this: SIPp! PJSUA... not sure what to do with that, since we don't have a
wrapper class. I may just steal Terry's work item and do that in conjunction with
this work.
Modified:
asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py
asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py
asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py
asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test
asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test
asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test
asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test
asterisk/team/mjordan/twisted_process/tests/mixmonitor/run-test
asterisk/team/mjordan/twisted_process/tests/mixmonitor_audiohook_inherit/run-test
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/CDRTestCase.py Fri Mar 16 17:29:14 2012
@@ -132,12 +132,23 @@
def ami_test_done(self, ami, event):
if event.get("event") == "Hangup":
- if self.no_active_channels():
- try:
- self.stop_reactor()
- except ReactorNotRunning:
- # No problemo.
- pass
+ self.check_active_channels()
+
+ def check_active_channels(self):
+ """
+ Check to see if we have any active channels. If we don't kill the reactor.
+ """
+ def __parse_output(result):
+ first_line = result.output.split('\n', 1)[0]
+ first_number = first_line.split(' ', 1)[0]
+ if first_number == '0':
+ logger.debug("No channels detected; stopping reactor")
+ self.stop_reactor()
+
+ for asterisk in self.ast:
+ # 0 active channels
+ cli_deferred = asterisk.cli_exec('core show channels count')
+ cli_deferred.addCallback(__parse_output)
def run(self):
"""
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/ChannelTestCondition.py Fri Mar 16 17:29:14 2012
@@ -12,6 +12,7 @@
import unittest
from TestConditions import TestCondition
+from twisted.internet import defer
logger = logging.getLogger(__name__)
@@ -32,9 +33,8 @@
self.allowed_channels = test_config.config['allowedchannels']
def evaluate(self, related_test_condition = None):
- for ast in self.ast:
- """ For logging / debug purposes, do a full core show channels """
- channel_lines = ast.cli_exec('core show channels')
+ def __evaluate_channel_result(cli_object):
+ channel_lines = cli_object.output
channel_tokens = channel_lines.strip().split('\n')
active_channels = 0
for token in channel_tokens:
@@ -44,6 +44,18 @@
if active_channels > self.allowed_channels:
super(ChannelTestCondition, self).failCheck(
'Detected number of active channels %d is greater than the allowed %d on Asterisk %s' % (active_channels, self.allowed_channels, ast.host))
- """ Set to pass if we haven't detected any failures """
- super(ChannelTestCondition, self).passCheck()
+ def __pass_check(result):
+ super(ChannelTestCondition, self).passCheck()
+
+ deferred_list = []
+ for ast in self.ast:
+ """ For logging / debug purposes, do a full core show channels """
+ d = ast.cli_exec('core show channels')
+ d.addCallback(__evaluate_channel_result)
+ deferred_list.append(d)
+
+ dl = defer.DeferredList(deferred_list)
+ dl.addCallback(__pass_check)
+
+ return dl
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/TestCase.py Fri Mar 16 17:29:14 2012
@@ -13,7 +13,7 @@
import os
import datetime
import time
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from starpy import manager, fastagi
from asterisk import Asterisk
@@ -59,6 +59,7 @@
self.testStateController = None
self.pcap = None
self.pcapfilename = None
+ self.__stopping = False
self.testlogdir = os.path.join(Asterisk.test_suite_root, self.base, str(os.getpid()))
self.ast_version = AsteriskVersion()
@@ -199,66 +200,110 @@
return PcapListener(device, bpf_filter, dumpfile, self.__pcap_callback)
def start_asterisk(self):
- return
+ """ This is kept mostly for legacy support. When the TestCase previously
+ used a synchronous, blocking mechanism independent of the twisted
+ reactor to spawn Asterisk, this method was called. Now the Asterisk
+ instances are started immediately after the reactor has started.
+
+ Derived methods can still implement this and have it be called when
+ the reactor is running, but immediately before instances of Asterisk
+ are launched.
+ """
+ pass
def __start_asterisk(self):
"""
Start the instances of Asterisk that were previously created. See
- create_asterisk. Note that this should be called before the reactor is
- told to run.
- """
+ create_asterisk. Note that this should be the first thing called
+ when the reactor has started to run
+ """
+ def __perform_pre_checks(result):
+ """ Execute the pre-condition checks """
+ self.testConditionController.evaluate_pre_checks()
+ return result
+
+ def __run_callback(result):
+ """ Notify the test that we are running """
+ self.run()
+
+ def __check_success_failure(result):
+ """ Make sure the instances started properly """
+ for (success, value) in result:
+ if not success:
+ logger.error(value)
+ self.stop_reactor()
+ return result
+
+ # Call the method that derived objects can override
+ self.start_asterisk()
+
+ start_defers = []
for index, item in enumerate(self.ast):
logger.info("Starting Asterisk instance %d" % (index + 1))
- self.ast[index].start()
- self.testConditionController.evaluate_pre_checks()
+ temp_defer = self.ast[index].start()
+ start_defers.append(temp_defer)
+
+ d = defer.DeferredList(start_defers, consumeErrors=True)
+ d.addCallback(__check_success_failure)
+ d.addCallback(__perform_pre_checks)
+ d.addCallback(__run_callback)
def stop_asterisk(self):
"""
- Stop the instances of Asterisk that were previously started. See
- start_asterisk. Note that this should be called after the reactor has
- returned from its run.
-
- If there were errors exiting asterisk, this function will return False.
- """
+ Called when the instances of Asterisk are being stopped. Note that previously,
+ this explicitly stopped the Asterisk instances: now they are stopped automatically
+ when the reactor is stopped.
+ """
+ pass
+
+ def __stop_asterisk(self):
+ """ Stops the instances of Asterisk.
+
+ Stops the instances of Asterisk - called when stop_reactor is called. This
+ returns a deferred object that can be used to be notified when all instances
+ of Asterisk have stopped.
+ """
+
+ def __check_success_failure(result):
+ """ Make sure the instances stopped properly """
+ for (success, value) in result:
+ if not success:
+ logger.warning(value)
+ # This should already be called when the reactor is being terminated
+ # if we couldn't stop the instance of Asterisk, there isn't much else to do
+ # here other then complain
+ return result
+
res = True
self.testConditionController.evaluate_post_checks()
+ stop_defers = []
for index, item in enumerate(self.ast):
logger.info("Stopping Asterisk instance %d" % (index + 1))
- returncode = self.ast[index].stop()
- if returncode < 0:
- # XXX setting passed here might be overridden later in a
- # derived class. This is bad.
- self.passed = False
- logger.error("Asterisk instance %d exited with signal %d" % (index + 1, abs(returncode)))
- res = False
- elif returncode > 0:
- # XXX same here
- self.passed = False
- logger.error("Asterisk instance %d exited with non-zero return code %d" % (index + 1, returncode))
- res = False
-
- return res
-
- def no_active_channels(self):
- """
- Return true if all our asterisk children have 0 active channels.
- """
- for asterisk in self.ast:
- # 0 active channels
- first_line = asterisk.cli_exec('core show channels count').split('\n', 1)[0]
- # 0
- first_number = first_line.split(' ', 1)[0]
- if first_number != '0':
- return False
- return True
+ temp_defer = self.ast[index].stop()
+ stop_defers.append(temp_defer)
+
+ d = defer.DeferredList(stop_defers, consumeErrors=True)
+ d.addCallback(__check_success_failure)
+ return d
def stop_reactor(self):
"""
Stop the reactor and cancel the test.
"""
- logger.info("Stopping Reactor")
- if reactor.running:
- reactor.stop()
+ def __stop_reactor(result):
+ """ Called when the Asterisk instances are stopped """
+ logger.info("Stopping Reactor")
+ if reactor.running:
+ try:
+ reactor.stop()
+ except twisted.internet.error.ReactorNotRunning:
+ # Meh.
+ pass
+ if not self.__stopping:
+ logger.info("Stopping Asterisk instances")
+ df = self.__stop_asterisk()
+ df.addCallback(__stop_reactor)
+ self.__stopping = True
def __reactor_timeout(self):
"""
@@ -269,10 +314,16 @@
self.stop_reactor()
def __run(self):
- logger.debug("Reactor is running: %s" % str(reactor.running))
+ """
+ Private entry point called when the reactor first starts up.
+ This needs to first ensure that Asterisk is fully up and running before
+ moving on.
+ """
if (self.ast):
self.__start_asterisk()
- self.run()
+ else:
+ # If no instances of Asterisk are needed, go ahead and just run
+ self.run()
def run(self):
"""
@@ -314,7 +365,6 @@
pass
def __pcap_callback(self, packet):
- logger.debug("Received packet: %s\n" % (packet,))
self.pcap_callback(packet)
def handleOriginateFailure(self, reason):
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/asterisk.py Fri Mar 16 17:29:14 2012
@@ -22,14 +22,14 @@
from config import ConfigFile
from version import AsteriskVersion
-from twisted.internet import reactor, protocol, threads
+from twisted.internet import reactor, protocol, threads, defer
from twisted.internet import utils
logger = logging.getLogger(__name__)
class AsteriskCliCommand():
"""
- Manages an Asterisk CLI command. This is a blocking operation
+ Class that manages an Asterisk CLI command.
"""
def __init__(self, host, cmd):
@@ -46,59 +46,82 @@
self.err = ""
def execute(self):
- df = threads.deferToThread(self.__execute)
- df.addErrback(self.__thread_err)
-
- def __execute(self):
- logger.debug("Spawning command")
+ """
+ Execute the CLI command.
+
+ Returns a deferred that will be called when the operation completes. The
+ parameter to the deferred is this object.
+ """
+ def __cli_output_callback(result):
+ """ Handle regular output from the Asterisk CLI instance """
+ self.__set_properties(result)
+ logger.debug("Asterisk CLI %s exited %d" % (self.__host, self.exitcode))
+ if self.exitcode:
+ self.__deferred.errback(self)
+ else:
+ self.__deferred.callback(self)
+
+ def __cli_error_callback(result):
+ """ Handle an error coming back from the Asterisk CLI instance """
+ self.__set_properties(result)
+ logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
+ self.__deferred.errback(self)
+
+ self.__deferred = defer.Deferred()
df = utils.getProcessOutputAndValue(self.__cmd[0], self.__cmd)
- df.addCallbacks(self.__cli_output_callback, self.__cli_error_callback)
-
- def __cli_output_callback(self, result):
- #self.transport.loseConnection()
- logger.debug("Asterisk CLI %s exited %s" % (self.__host, str(result)))
- self.__set_properties(result)
-
- def __cli_error_callback(self, result):
- #self.transport.loseConnection()
- self.__set_properties(result)
- logger.debug("Asterisk CLI %s exited %s" % (self.__host, str(result)))
- logger.warning("Asterisk CLI %s exited %d with error: %s" % (self.__host, self.exitcode, self.err))
-
- def __thread_err(self, result):
- logger.warning("Asterisk CLI %s failed to spawn properly: %s" % (self.__host, str(result)))
+ df.addCallback(__cli_output_callback)
+ df.addErrback(__cli_error_callback)
+
+ return self.__deferred
def __set_properties(self, result):
+ """ Set the properties based on the result of the getProcessOutputAndValue call """
out, err, code = result
self.exitcode = code
self.output = out
self.err = err
class AsteriskProtocol(protocol.ProcessProtocol):
- def __init__(self, host):
+ """
+ Class that manages an Asterisk instance
+ """
+
+ def __init__(self, host, stop_deferred):
+ """ Create an AsteriskProtocol object
+
+ Create an AsteriskProtocol object, which manages the interactions with
+ the Asterisk process
+
+ Keyword Arguments:
+ host - the hostname or address of the Asterisk instance
+ stop_deferred - a twisted Deferred object that will be called when the
+ process has exited
+ """
+
self.output = ""
self.__host = host
self.exitcode = 0
self.exited = False
+ self.__stop_deferred = stop_deferred
def outReceived(self, data):
+ """ Override of ProcessProtocol.outReceived """
logger.debug("Asterisk %s received: %s" % (self.__host, data))
self.output += data
def connectionMade(self):
+ """ Override of ProcessProtocol.connectionMade """
logger.debug("Asterisk %s - connection made" % (self.__host))
def errReceived(self, data):
+ """ Override of ProcessProtocol.errReceived """
logger.warn("Asterisk %s received error: %s" % (self.__host, data))
- def processExited(self, reason):
- logger.debug("Asterisk %s exited with code %d" % (self.__host, reason.value.exitCode,))
- self.exitcode = reason.value.exitCode
- self.exitcode = True
-
def processEnded(self, reason):
+ """ Override of ProcessProtocol.processEnded """
logger.debug("Asterisk %s ended with code %d" % (self.__host, reason.value.exitCode,))
self.exitcode = reason.value.exitCode
+ self.__stop_deferred.callback("Asterisk %s ended with code %d" % (self.__host, reason.value.exitCode,))
self.exited = True
class Asterisk:
@@ -181,9 +204,11 @@
self.directories[var] = val
def start(self):
- """Start this instance of Asterisk.
-
- This function starts up this instance of Asterisk.
+ """ Start this instance of Asterisk.
+
+ Returns:
+ A deferred object that is called when Asterisk is fully booted, or an
+ error if Asterisk fails to start
Example Usage:
asterisk.start()
@@ -191,6 +216,20 @@
Note that calling this will install the default testsuite
config files, if they have not already been installed
"""
+
+ def __wait_fully_booted_callback(cli_command):
+ self.__start_deferred.callback("Successfully started Asterisk %s" % self.host)
+
+ def __wait_fully_booted_error(cli_command):
+ if time.time() - self.__start_asterisk_time > 5:
+ logger.error("Asterisk core waitfullybooted for %s failed" % self.host)
+ self.__start_deferred.errback("Command core waitfullybooted failed")
+ else:
+ logger.debug("Asterisk core waitfullybooted failed, attempting again...")
+ cli_deferred = self.cli_exec("core waitfullybooted")
+ cli_deferred.addCallback(__wait_fully_booted_callback)
+ cli_deferred.addErrback(__wait_fully_booted_error)
+
self.install_configs(os.getcwd() + "/configs")
self.__setup_configs()
@@ -200,75 +239,120 @@
"-C", "%s" % os.path.join(self.astetcdir, "asterisk.conf")
]
- self.processProtocol = AsteriskProtocol(self.host)
+ self.__start_deferred = defer.Deferred()
+ self.__stop_deferred = defer.Deferred()
+ self.processProtocol = AsteriskProtocol(self.host, self.__stop_deferred)
self.process = reactor.spawnProcess(self.processProtocol, cmd[0], cmd)
- # Be _really_ sure that Asterisk has started up before returning.
-
- # Poll the instance to make sure we created it successfully
- if (self.processProtocol.exited):
- """ Rut roh, Asterisk process exited prematurely """
- logger.error("Asterisk instance %s exited prematurely with return code %d" % (self.host, self.processProtocol.exitcode))
-
- start = time.time()
- for i in xrange(5):
- time.sleep(1.0)
- # This command should stall until completed, but if an
- # exception occurs, it returns the empty string.
- if self.cli_exec("core waitfullybooted", warn_on_fail=False):
- break;
-
+ # Begin the wait fully booted cycle
+ self.__start_asterisk_time = time.time()
+ cli_deferred = self.cli_exec("core waitfullybooted")
+ cli_deferred.addCallback(__wait_fully_booted_callback)
+ cli_deferred.addErrback(__wait_fully_booted_error)
+ return self.__start_deferred
def stop(self):
"""Stop this instance of Asterisk.
This function is used to stop this instance of Asterisk.
+ Returns a deferred that can be used to detect when Asterisk exits,
+ or if it fails to exit
+
Example Usage:
asterisk.stop()
"""
+
+ def __send_stop_gracefully():
+ if self.ast_version < AsteriskVersion("1.6.0"):
+ cli_deferred = self.cli_exec("stop gracefully")
+ else:
+ cli_deferred = self.cli_exec("core stop gracefully")
+ return cli_deferred
+
+ def __send_stop_now():
+ if self.ast_version < AsteriskVersion("1.6.0"):
+ cli_deferred = self.cli_exec("stop now")
+ else:
+ cli_deferred = self.cli_exec("core stop now")
+ return cli_deferred
+
+ def __stop_gracefully_callback(cli_command):
+ logger.debug("Successfully stopped Asterisk %s" % self.host)
+
+ def __stop_gracefully_error(cli_command):
+ if time.time() - self.__stop_time > 5:
+ logger.warning("Asterisk graceful stop for %s failed, attempting stop now" % self.host)
+ self.__stop_time = time.time()
+ cli_deferred = __send_stop_now()
+ cli_deferred.addCallback(__stop_now_callback)
+ cli_deferred.addErrback(__stop_now_error)
+ else:
+ logger.debug("Asterisk graceful stop failed, attempting again...")
+ cli_deferred = __send_stop_gracefully()
+ cli_deferred.addCallback(__stop_gracefully_callback)
+ cli_deferred.addErrback(__stop_gracefully_error)
+
+ def __stop_now_callback(cli_command):
+ logger.debug("Successfully stopped Asterisk %s" % self.host)
+
+ def __stop_now_error(cli_command):
+ if time.time() - self.__stop_time > 5:
+ logger.warning("Asterisk stop now for %s failed, attempting TERM" % self.host)
+ # If even a "stop now" didn't do the trick, fall back to sending
+ # signals to the process. First, send a SIGTERM. If it _STILL_ hasn't
+ # gone away after another 5 seconds, send SIGKILL.
+ try:
+ self.process.signalProcess("TERM")
+ if not self.processProtocol.exited:
+ reactor.callLater(5, __check_process_and_kill)
+ except twisted.internet.error.ProcessExitedAlready:
+ # Probably that we sent a signal to a process that was already
+ # dead. Just ignore it.
+ pass
+ logger.debug("Successfully terminated Asterisk %s" % self.host)
+ else:
+ logger.debug("Asterisk stop now failed, attempting again...")
+ cli_deferred = __send_stop_now()
+ cli_deferred.addCallback(__stop_now_callback)
+ cli_deferred.addErrback(__stop_now_error)
+
+ def __check_process_and_kill():
+ try:
+ if not self.processProtocol.exited:
+ logger.info("Sending KILL to Asterisk %s" % self.host)
+ self.process.signalProcess("KILL")
+ except twisted.internet.error.ProcessExitedAlready:
+ # Pass on this
+ pass
+ # If you kill the process, the ProcessProtocol will never get the note that its dead
+ try:
+ self.process.loseConnection()
+ except:
+ pass
+ self.__stop_deferred.callback("Asterisk %s KILLED" % self.host)
+
+ def __cancel_kill(reason):
+ try:
+ self.__cancel_token.cancel()
+ except ValueError:
+ pass
+ return reason
+
# Start by asking to stop gracefully.
- if self.ast_version < AsteriskVersion("1.6.0"):
- self.cli_exec("stop gracefully")
- else:
- self.cli_exec("core stop gracefully")
- for i in xrange(5):
- time.sleep(1.0)
- if self.processProtocol.exited:
- return self.process.returncode
-
- # If the graceful shutdown did not complete within 5 seconds, ask
- # Asterisk to stop right now.
- if self.ast_version < AsteriskVersion("1.6.0"):
- self.cli_exec("stop now")
- else:
- self.cli_exec("core stop now")
- for i in xrange(5):
- time.sleep(1.0)
- if self.processProtocol.exited:
- return self.processProtocol.exitcode
-
- # If even a "stop now" didn't do the trick, fall back to sending
- # signals to the process. First, send a SIGTERM. If it _STILL_ hasn't
- # gone away after another 5 seconds, send SIGKILL.
- try:
- self.process.signalProcess("TERM")
- for i in xrange(5):
- time.sleep(1.0)
- if self.processProtocol.exited:
- return self.processProtocol.exitcode
- self.process.signalProcess("KILL")
- except twisted.internet.error.ProcessExitedAlready:
- # Probably that we sent a signal to a process that was already
- # dead. Just ignore it.
- pass
-
- # If we get to this point and Asterisk still hasn't exited, there isn't much
- # to do other then flag it
- if not self.processProtocol.exited:
- logger.error("Asterisk instance %s failed to exit" % self.host)
-
- return self.processProtocol.exitcode
+ self.__stop_time = time.time()
+ d = __send_stop_gracefully()
+ d.addCallback(__stop_gracefully_callback)
+ d.addErrback(__stop_gracefully_error)
+
+ # There's a chance that we'll get back a 'yes, I'll stop' from Asterisk, and then it
+ # just won't. Add a 10 second callback to the reactor to terminate the Asterisk instance
+ # with prejudice if it lies to us. That should be more then enough time for one of the
+ # previous methods to stop it in a nicer fashion.
+ self.__cancel_token = reactor.callLater(10, __check_process_and_kill)
+ self.__stop_deferred.addCallback(__cancel_kill)
+
+ return self.__stop_deferred
def install_configs(self, cfg_path):
"""Installs all files located in the configuration directory for this
@@ -389,6 +473,9 @@
Keyword Arguments:
blocking -- When True, do not return from this function until the CLI
command finishes running. The default is True.
+
+ Returns:
+ A deferred object that can be used to listen for command completion
Example Usage:
asterisk.originate("Local/a_exten at context extension b_exten at context")
@@ -412,17 +499,20 @@
<tech/data> extension <exten>@<context>"
if self.ast_version < AsteriskVersion("1.6.2"):
- self.cli_exec("originate %s" % argstr, blocking=blocking)
+ return self.cli_exec("originate %s" % argstr, blocking=blocking)
else:
- self.cli_exec("channel originate %s" % argstr, blocking=blocking)
+ return self.cli_exec("channel originate %s" % argstr, blocking=blocking)
def cli_exec(self, cli_cmd, blocking=True, warn_on_fail=True):
"""Execute a CLI command on this instance of Asterisk.
Keyword Arguments:
cli_cmd -- The command to execute.
- blocking -- When True, do not return from this function until the CLI
- command finishes running. The default is True.
+ blocking -- When True, the process is spawned as a blocking
+ operation, at least as well as we can with twisted.
+
+ Returns:
+ A deferred object that will be signaled when the process has exited
Example Usage:
asterisk.cli_exec("core set verbose 10")
@@ -438,18 +528,13 @@
logger.debug("Executing %s ..." % cmd)
if not blocking:
- cliProtocol = AsteriskProtocol(("CLI:%s" % self.host))
+ stop_deferred = defer.Deferred()
+ cliProtocol = AsteriskProtocol(("CLI:%s" % self.host), stop_deferred)
process = reactor.spawnProcess(cliProtocol, cmd[0], cmd)
- return ""
+ return stop_deferred
cliProtocol = AsteriskCliCommand(self.host, cmd)
- cliProtocol.execute()
- for i in range(0, 5):
- if cliProtocol.exitcode >= 0:
- return cliProtocol.output
- else:
- time.sleep(1.0)
- logger.debug("Checking again")
+ return cliProtocol.execute()
def __make_directory_structure(self):
""" Mirror system directory structure """
Modified: asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py (original)
+++ asterisk/team/mjordan/twisted_process/lib/python/asterisk/cdr.py Fri Mar 16 17:29:14 2012
@@ -15,6 +15,7 @@
import csv
import re
import logging
+import time
logger = logging.getLogger(__name__)
@@ -86,13 +87,23 @@
return
self.__records = []
- try:
- cdr = csv.DictReader(open(fn, "r"), AsteriskCSVCDRLine.get_fields(), ",")
- except IOError:
- logger.error("Failed to open CDR file '%s'" % (fn))
- return
- except:
- logger.error("Unexpected error: %s" % (sys.exc_info()[0]))
+
+ cdr = None
+ for i in range(0, 5):
+ # So... sometimes the CDR tests can exit really, really fast. If that happens,
+ # the CSV can sometimes not exist yet - attempt opening it a few times, and only
+ # error out if we fail a few times
+ try:
+ cdr = csv.DictReader(open(fn, "r"), AsteriskCSVCDRLine.get_fields(), ",")
+ break
+ except IOError as (errno, strerror):
+ logger.debug("IOError %d[%s] while opening CDR file '%s'" % (errno, strerror, fn))
+ except:
+ logger.debug("Unexpected error: %s" % (sys.exc_info()[0]))
+ time.sleep(1.0)
+
+ if not cdr:
+ logger.error("Unable to open CDR file '%s'" % (fn))
return
for r in cdr:
Modified: asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/apps/voicemail/func_vmcount/run-test Fri Mar 16 17:29:14 2012
@@ -62,9 +62,9 @@
self.voicemailManager.createDummyVoicemail("default", "1234", self.voicemailManager.oldFolderName, i, self.formats)
ami.registerEvent('UserEvent', self.user_event)
- self.ast[ami.id].cli_exec("dialplan set global EXPECTED_NEW 15")
- self.ast[ami.id].cli_exec("dialplan set global EXPECTED_URGENT 5")
- self.ast[ami.id].cli_exec("dialplan set global EXPECTED_OLD 15")
+ ami.setVar(channel = "", variable = "EXPECTED_NEW", value = "15")
+ ami.setVar(channel = "", variable = "EXPECTED_URGENT", value = "5")
+ ami.setVar(channel = "", variable = "EXPECTED_OLD", value = "15")
df1 = ami.originate("Local/s at listener", "voicemail", "1234", 1)
df1.addErrback(self.handleOriginateFailure)
Modified: asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_barge/run-test Fri Mar 16 17:29:14 2012
@@ -83,6 +83,9 @@
self.ami = ami
self.ami.registerEvent('ChanSpyStart', self.chanspyEvent)
+ self.ami.setVar(channel = "", variable = "TESTAUDIO1", value = self.audiofile1)
+ self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
+
def amiLoginError(self, ami):
self.logLastStep("AMI login failed")
reactor.callLater(1, self.readResult)
@@ -115,9 +118,6 @@
playfilearg = "--play-file=%s.wav" % (self.talkingaudio)
self.pja = subprocess.Popen(['pjsua', '--local-port=5065', '--auto-answer=200', '--null-audio', '--auto-loop'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.pjb = subprocess.Popen(['pjsua', '--local-port=5066', '--auto-answer=200', playfilearg, '--null-audio', '--auto-play'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
- self.ast1.cli_exec("core set global TESTAUDIO1 " + self.audiofile1)
- self.ast1.cli_exec("core set global TALK_AUDIO " + self.talkingaudio)
def stopProcesses(self):
self.logLastStep("Stopping Processes")
Modified: asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/chanspy/chanspy_w_mixmonitor/run-test Fri Mar 16 17:29:14 2012
@@ -79,6 +79,8 @@
self.logLastStep("Connected to the AMI")
self.ami = ami
self.ami.registerEvent('ChanSpyStart', self.chanspyEvent)
+ self.ami.setVar(channel = "", variable = "TESTAUDIO1", value = self.audiofile1)
+ self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
def amiLoginError(self, ami):
self.logLastStep("AMI login failed")
@@ -119,9 +121,6 @@
self.pja = subprocess.Popen(['pjsua', '--local-port=5065', '--auto-answer=200', '--null-audio'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.pjb = subprocess.Popen(['pjsua', '--local-port=5066', '--auto-answer=200', '--null-audio'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.pjc = subprocess.Popen(['pjsua', '--local-port=5067', '--auto-answer=200', '--null-audio'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
- self.ast1.cli_exec("core set global TESTAUDIO1 " + self.audiofile1)
- self.ast1.cli_exec("core set global TALK_AUDIO " + self.talkingaudio)
def stopProcesses(self):
self.logLastStep("Stopping Processes")
Modified: asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test?view=diff&rev=3098&r1=3097&r2=3098
==============================================================================
--- asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test (original)
+++ asterisk/team/mjordan/twisted_process/tests/dynamic-modules/run-test Fri Mar 16 17:29:14 2012
@@ -9,144 +9,136 @@
import sys
import os
+import logging
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
sys.path.append("lib/python")
from asterisk.asterisk import Asterisk
from asterisk.TestCase import TestCase
+logger = logging.getLogger(__name__)
class moduleLoadUnloadTest(TestCase):
def __init__(self):
TestCase.__init__(self)
self.create_asterisk()
+ def check_file(self, module):
+ module[1] = (os.path.isfile(self.ast[0].base + "/usr/lib/asterisk/modules/" + module[0]))
+ return module[1]
+
+ def pre_load_next(self):
+ self.__preload_test_counter += 1
+ if (self.__preload_test_counter < len(self.preload_res)):
+ self.pre_load_module(self.preload_res[self.__preload_test_counter])
+ else:
+ self.load_next()
+
+ def pre_load_module(self, module):
+ def __module_callback(reason):
+ text = "Loaded " + self.__module[0]
+ if reason.output.count(text) > 0:
+ self.__module[2] = True
+ self.pre_load_next()
+
+ if not self.check_file(module):
+ logger.info(module[0] + " does not exist! Skipping this test.")
+ self.pre_load_next()
+
+ df = self.ast[0].cli_exec("module load " + module[0])
+ df.addCallback(__module_callback)
+ self.__module = module
+
+ def load_next(self):
+ self.__load_test_counter += 1
+ if (self.__load_test_counter < len(self.res)):
+ self.load_module(self.res[self.__load_test_counter])
+ else:
+ self.unload_next()
+
def load_module(self, module):
+ def __module_callback(reason):
+ text = "Loaded " + self.__module[0]
+ if reason.output.count(text) > 0:
+ self.__module[2] = True
+ self.load_next()
+
if not self.check_file(module):
- print module[0] + " does not exist! Skipping test."
- return False
+ logger.info(module[0] + " does not exist! Skipping test.")
+ self.load_next()
- text = "Loaded " + module[0]
- res = self.ast[0].cli_exec("module load " + module[0])
- if res.count(text) > 0:
- module[2] = True
- return module[2]
+ df = self.ast[0].cli_exec("module load " + module[0])
+ df.addCallback(__module_callback)
+ self.__module = module
+
+ def unload_next(self):
+ self.__unload_test_counter += 1
+ if (self.__unload_test_counter < len(self.unloads)):
+ self.unload_module(self.unloads[self.__unload_test_counter])
+ else:
+ if not self.__checking_results:
+ self.__checking_results = True
+ self.check_results()
def unload_module(self, module):
+ def __module_callback(reason):
+ test = "Unloaded " + self.__module[0]
+ if reason.output.count(text) > 0:
+ self.__module[3] = True
+ self.unload_next()
+
[... 199 lines stripped ...]
More information about the asterisk-commits
mailing list