[asterisk-commits] mjordan: testsuite/asterisk/trunk r5037 - in /asterisk/trunk/tests/apps: chan...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu May 15 11:28:58 CDT 2014
Author: mjordan
Date: Thu May 15 11:28:55 2014
New Revision: 5037
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=5037
Log:
apps/mixmonitor|chanspy: Fix failing tests
Much like the FastAGI tests, these two tests (mixmonitor and chanspy_barge)
required knowledge of the test artifact directory structure but made these
assumptions outside of the TestCase class. This patch ports these two tests
over to use TestCase as well.
Modified:
asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf
asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test
asterisk/trunk/tests/apps/mixmonitor/run-test
Modified: asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf?view=diff&rev=5037&r1=5036&r2=5037
==============================================================================
--- asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf (original)
+++ asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf Thu May 15 11:28:55 2014
@@ -22,3 +22,4 @@
exten => play_recording,1,Answer()
exten => play_recording,n,PlayBack(${TESTAUDIO1})
+exten => play_recording,n,Echo()
Modified: asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test?view=diff&rev=5037&r1=5036&r2=5037
==============================================================================
--- asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test (original)
+++ asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test Thu May 15 11:28:55 2014
@@ -1,7 +1,8 @@
#!/usr/bin/env python
'''
-Copyright (C) 2010, Digium, Inc.
+Copyright (C) 2010-2014, Digium, Inc.
David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
This program is free software, distributed under the terms of
the GNU General Public License Version 2.
@@ -11,19 +12,23 @@
import os
import signal
import subprocess
+import logging
+
from twisted.application import service, internet
from twisted.internet import reactor, defer
from starpy import manager
from starpy import fastagi
sys.path.append("lib/python")
-from asterisk.asterisk import Asterisk
+from asterisk.test_case import TestCase
-workingdir = "apps/chanspy/chanspy_barge"
-testdir = "tests/%s" % workingdir
+LOGGER = logging.getLogger(__name__)
-class ChanSpyBarge:
+testdir = "tests/apps/chanspy/chanspy_barge"
+
+class ChanSpyBarge(TestCase):
def __init__(self):
+ super(ChanSpyBarge, self).__init__()
self.last_step = ""
self.passed = False
self.numSpyEvents = 0
@@ -31,21 +36,18 @@
self.talkDetected = 0
self.test_to = 25
- reactor.callWhenRunning(self.run)
- self.agi = fastagi.FastAGIFactory(self.reportPassed)
- reactor.listenTCP(4573, self.agi, self.test_to, '127.0.0.1')
+ self.create_asterisk()
+ self.create_fastagi_factory()
ast_conf_options = {
"transmit_silence" : "yes",
}
- self.ast1 = Asterisk(base=workingdir, ast_conf_options=ast_conf_options)
- self.ast1.install_configs("%s/configs/ast1" % (testdir))
self.talkingaudio = os.path.join(os.getcwd(), "%s/sounds/talking" % (testdir))
self.audiofile1 = os.path.join(os.getcwd(), "%s/testaudio1" % (testdir))
- def reportPassed(self, agi):
- print "GOT PASS RESULTS!!!\n"
+ def fastagi_connect(self, agi):
+ LOGGER.info("GOT PASS RESULTS!!!")
sequence = fastagi.InSequence()
sequence.append(agi.execute, "HangUp")
sequence.append(agi.finish)
@@ -55,101 +57,80 @@
def readResult(self):
self.stop_reactor()
- self.logLastStep("Reading results")
- self.ast1.cli_exec("core show locks") # get lock output in case of deadlock before tearing down.
- self.ast1.cli_exec("core show channels")# if channels are still up for some reason, we want to know that as well
+ LOGGER.info("Reading results")
+ self.ast[0].cli_exec("core show locks") # get lock output in case of deadlock before tearing down.
+ self.ast[0].cli_exec("core show channels")# if channels are still up for some reason, we want to know that as well
- self.ast1.cli_exec("core show globals") # The global variables here hold failure conditions
+ self.ast[0].cli_exec("core show globals") # The global variables here hold failure conditions
if self.passed == True:
- print 'SIP ChanSpy test PASSED!'
+ LOGGER.info('SIP ChanSpy test PASSED!')
else:
- print 'SIP ChanSpy Test FAILED'
-
- def stop_reactor(self):
- def __finish_stop(result):
- print "Stopping Reactor ..."
- if reactor.running:
- reactor.stop()
- return result
- df = self.ast1.stop()
- df.addCallback(__finish_stop)
+ LOGGER.error('SIP ChanSpy Test FAILED')
def chanspyEvent(self, ami, event):
- self.logLastStep('Received ChanSpyStart event')
+ LOGGER.info('Received ChanSpyStart event')
self.numSpyEvents += 1
if event['spyeechannel'].count('end_a') > 0:
reactor.callLater(3, self.aHangup)
reactor.callLater(4, self.bHangup)
- def amiOnConnect(self, ami):
- self.logLastStep("Connected to the AMI")
+ def ami_connect(self, ami):
self.ami = ami
self.ami.registerEvent('ChanSpyStart', self.chanspyEvent)
self.ami.setVar(channel = "", variable = "TESTAUDIO1", value = self.audiofile1)
self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
- def amiLoginError(self, ami):
- self.logLastStep("AMI login failed")
- reactor.callLater(1, self.readResult)
+ reactor.callLater(1, self.callChanSpy)
+ reactor.callLater(2, self.aCall)
- def amiLogin(self):
- self.logLastStep("Logging in to the AMI")
- self.ami_factory = manager.AMIFactory("user", "mysecret")
- self.ami_factory.login('127.0.0.1', 5038).addCallbacks(self.amiOnConnect, self.amiLoginError)
def aCall(self):
- self.logLastStep("A Calling into Wait")
+ LOGGER.info("A Calling into Wait")
self.pja.stdin.write("m\n")
self.pja.stdin.write("sip:play_exten at 127.0.0.1:5060\n")
def bCall(self):
- self.logLastStep("B Calling into Playback")
+ LOGGER.info("B Calling into Playback")
self.pjb.stdin.write("m\n")
self.pjb.stdin.write("sip:play_exten at 127.0.0.1:5060\n")
def aHangup(self):
+ LOGGER.info("Hanging up A")
self.pja.stdin.write("h\n")
def bHangup(self): #calls into chanspy extension and plays audio to A using Barge
+ LOGGER.info("Hanging up B")
self.pjb.stdin.write("h\n")
reactor.callLater(2, self.verifyAudio)
def startProcesses(self):
- self.logLastStep("Starting Processes")
+ LOGGER.info("Starting Processes")
playfilearg = "--play-file=%s.wav" % (self.talkingaudio)
self.pja = subprocess.Popen(['pjsua', '--local-port=5065', '--auto-answer=200', '--null-audio', '--auto-loop'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.pjb = subprocess.Popen(['pjsua', '--local-port=5066', '--auto-answer=200', playfilearg, '--null-audio', '--auto-play'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def stopProcesses(self):
- self.logLastStep("Stopping Processes")
+ LOGGER.info("Stopping Processes")
os.kill(self.pja.pid, signal.SIGKILL)
os.kill(self.pjb.pid, signal.SIGKILL)
- def logLastStep(self, step):
- print step
- self.lastStep = step
-
def callChanSpy(self):
- self.logLastStep("Placing call to ChanSpy extension.")
- self.ast1.cli_originate("SIP/end_b extension chanspytest at test")
+ LOGGER.info("Placing call to ChanSpy extension.")
+ self.ast[0].cli_originate("SIP/end_b extension chanspytest at test")
def verifyAudio(self):
- self.ast1.cli_originate("Local/play_recording at test extension detect_audio at test")
+ LOGGER.info("Verifying Audio")
+ self.ast[0].cli_originate("Local/play_recording at test extension detect_audio at test")
+
+ def stop_asterisk(self):
+ if not self.passed:
+ self.readResult()
def run(self):
- def __finish_start_ops(result):
- # call extensions
- self.amiLogin()
- reactor.callLater(1, self.callChanSpy)
- reactor.callLater(2, self.aCall)
- # stop and read results after timeout
- reactor.callLater(self.test_to, self.readResult)
- return result
+ self.create_ami_factory()
- df = self.ast1.start()
- df.addCallback(__finish_start_ops)
def main():
test = ChanSpyBarge()
Modified: asterisk/trunk/tests/apps/mixmonitor/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/apps/mixmonitor/run-test?view=diff&rev=5037&r1=5036&r2=5037
==============================================================================
--- asterisk/trunk/tests/apps/mixmonitor/run-test (original)
+++ asterisk/trunk/tests/apps/mixmonitor/run-test Thu May 15 11:28:55 2014
@@ -1,7 +1,8 @@
#!/usr/bin/env python
'''
-Copyright (C) 2010, Digium, Inc.
+Copyright (C) 2010-2014, Digium, Inc.
David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
This program is free software, distributed under the terms of
the GNU General Public License Version 2.
@@ -10,17 +11,21 @@
import sys
import os
import math
+import logging
+
from twisted.internet import reactor
-from starpy import manager
-sys.path.append("lib/python")
-from asterisk.asterisk import Asterisk
+sys.path.append("lib/python/asterisk")
+from test_case import TestCase
-workingdir = "apps/mixmonitor"
-testdir = "tests/%s" % workingdir
+testdir = "tests/apps/mixmonitor"
-class MixMonitorTest:
+LOGGER = logging.getLogger(__name__)
+
+class MixMonitorTest(TestCase):
def __init__(self):
+ super(MixMonitorTest, self).__init__()
+
self.passed = False
self.last_step = ""
# playback file is 2559 bytes of ulaw, that will come out to be ~41118 of wav
@@ -30,21 +35,18 @@
self.audiofile2size = -1
self.audiofile3size = -1
- reactor.callWhenRunning(self.run)
-
- self.ast1 = Asterisk(base=workingdir)
- self.ast1.install_configs("%s/configs/ast1" % (testdir))
-
self.audiofile1 = os.path.join(os.getcwd(), "%s/testaudio1" % (testdir))
self.audiofile2 = os.path.join(os.getcwd(), "%s/testaudio2" % (testdir))
self.audiofile3 = os.path.join(os.getcwd(), "%s/testaudio3" % (testdir))
self.talkingaudio = os.path.join(os.getcwd(), "%s/sounds/talking" % (testdir))
+ self.create_asterisk()
+
def read_result(self):
self.passed = True
self.stop_reactor()
- self.log_last_step("Reading result file from MixMonitor")
+ LOGGER.info("Reading result file from MixMonitor")
if os.path.exists(self.audiofile1 + ".raw"):
self.audiofile1size = os.path.getsize(self.audiofile1 + ".raw")
if os.path.exists(self.audiofile2 + ".raw"):
@@ -52,54 +54,42 @@
if os.path.exists(self.audiofile3 + ".raw"):
self.audiofile3size = os.path.getsize(self.audiofile3 + ".raw")
- self.log_last_step("audiofile1 size is %d, a negative size indicates the file was not present." % (self.audiofile1size, ))
+ LOGGER.debug("audiofile1 size is %d, a negative size indicates the file was not present." % (self.audiofile1size, ))
if math.fabs(self.audiofile1size - self.expectedfilesize) > self.filesizetolerance:
# if this failed mixmonitor is not creating the correct file size for the time we expect.
- self.log_last_step("audiofile1 size is not within the size tolerance.")
+ LOGGER.error("audiofile1 size is not within the size tolerance.")
self.passed = False
- self.log_last_step("audiofile2 size is %d, a negative size indicates the file was not present." % (self.audiofile2size, ))
+ LOGGER.debug("audiofile2 size is %d, a negative size indicates the file was not present." % (self.audiofile2size, ))
if math.fabs(self.audiofile2size - self.expectedfilesize) > self.filesizetolerance:
# if this failed it is likely because StopMixMonitor never let go of audiofile1
- self.log_last_step("audiofile2 size is not within the size tolerance.")
+ LOGGER.error("audiofile2 size is not within the size tolerance.")
self.passed = False
- self.log_last_step("audiofile3 size is %d, a negative size indicates the file was not present." % (self.audiofile3size, ))
+ LOGGER.debug("audiofile3 size is %d, a negative size indicates the file was not present." % (self.audiofile3size, ))
if self.audiofile3size == -1:
# if this failed it is likely because MixMonitor never let go of audiofile2 on hangup
- self.log_last_step("audiofile3 file does not exist.")
+ LOGGER.error("audiofile3 file does not exist.")
self.passed = False
if self.passed == True:
- self.log_last_step("Test Passed... All audio files are the correct.")
-
- def stop_reactor(self):
- def __finish_stop(result):
- print "Stopping Reactor ..."
- if reactor.running:
- reactor.stop()
- return result
- df = self.ast1.stop()
- df.addCallback(__finish_stop)
+ LOGGER.info("Test Passed... All audio files are the correct.")
def launch_test1(self):
- self.log_last_step("Placing call to test1 exten")
- self.ast1.cli_originate("Local/s at listener extension s at test1")
+ LOGGER.info("Placing call to test1 exten")
+ self.ast[0].cli_originate("Local/s at listener extension s at test1")
def launch_test2(self):
- self.log_last_step("Placing call to test2 exten")
- self.ast1.cli_originate("Local/s at listener extension s at test2")
+ LOGGER.info("Placing call to test2 exten")
+ self.ast[0].cli_originate("Local/s at listener extension s at test2")
- def log_last_step(self, step):
- print step
- self.last_step = step
def check_test1(self, ami, event):
- self.log_last_step("Checking Userevent")
+ LOGGER.info("Checking Userevent")
if event.get("userevent").lower() != "test1":
return
status = event.get("status")
- print ("Status of test1 is %s" % (status))
+ LOGGER.debug("Status of test1 is %s" % (status))
if status != "SUCCESS":
self.stop_reactor()
return
@@ -108,19 +98,18 @@
self.launch_test2()
def check_test2(self, ami, event):
- self.log_last_step("Checking Userevent")
+ LOGGER.info("Checking Userevent")
if event.get("userevent").lower() != "test2":
return
status = event.get("status")
- print ("Status of test2 is %s" % (status))
+ LOGGER.debug("Status of test2 is %s" % (status))
if status != "SUCCESS":
self.stop_reactor()
return
self.read_result()
- def ami_on_connect1(self, ami):
- self.log_last_step("Connected to AMI 1")
+ def ami_connect(self, ami):
self.ami = ami
self.ami.registerEvent("UserEvent", self.check_test1)
@@ -129,26 +118,10 @@
self.ami.setVar(channel = "", variable = "TESTAUDIO3", value = self.audiofile3)
self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
- def ami_login_error(self, ami):
- self.log_last_step("AMI login failed")
- self.stop_reactor()
-
- def ami_login(self):
- self.log_last_step("Logging in to the AMI")
- self.ami_factory1 = manager.AMIFactory("user", "mysecret")
- self.ami_factory1.login('127.0.0.1', 5038).addCallbacks(self.ami_on_connect1, self.ami_login_error)
+ self.launch_test1()
def run(self):
- def __finish_start_ops(result):
- self.ami_login()
- self.launch_test1()
-
- reactor.callLater(30, self.stop_reactor)
- return result
-
- self.log_last_step("Starting Asterisk")
- df = self.ast1.start()
- df.addCallback(__finish_start_ops)
+ self.create_ami_factory()
More information about the asterisk-commits
mailing list