[asterisk-commits] mjordan: testsuite/asterisk/trunk r5037 - in /asterisk/trunk/tests/apps: chan...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu May 15 11:28:58 CDT 2014

Author: mjordan
Date: Thu May 15 11:28:55 2014
New Revision: 5037

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=5037
apps/mixmonitor|chanspy: Fix failing tests

Much like the FastAGI tests, these two tests (mixmonitor and chanspy_barge)
required knowledge of the test artifact directory structure but made these
assumptions outside of the TestCase class. This patch ports these two tests
over to use TestCase as well.


Modified: asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf?view=diff&rev=5037&r1=5036&r2=5037
--- asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf (original)
+++ asterisk/trunk/tests/apps/chanspy/chanspy_barge/configs/ast1/extensions.conf Thu May 15 11:28:55 2014
@@ -22,3 +22,4 @@
 exten => play_recording,1,Answer()
 exten => play_recording,n,PlayBack(${TESTAUDIO1})
+exten => play_recording,n,Echo()

Modified: asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test?view=diff&rev=5037&r1=5036&r2=5037
--- asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test (original)
+++ asterisk/trunk/tests/apps/chanspy/chanspy_barge/run-test Thu May 15 11:28:55 2014
@@ -1,7 +1,8 @@
 #!/usr/bin/env python
-Copyright (C) 2010, Digium, Inc.
+Copyright (C) 2010-2014, Digium, Inc.
 David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
 This program is free software, distributed under the terms of
 the GNU General Public License Version 2.
@@ -11,19 +12,23 @@
 import os
 import signal
 import subprocess
+import logging
 from twisted.application import service, internet
 from twisted.internet import reactor, defer
 from starpy import manager
 from starpy import fastagi
-from asterisk.asterisk import Asterisk
+from asterisk.test_case import TestCase
-workingdir = "apps/chanspy/chanspy_barge"
-testdir = "tests/%s" % workingdir
+LOGGER = logging.getLogger(__name__)
-class ChanSpyBarge:
+testdir = "tests/apps/chanspy/chanspy_barge"
+class ChanSpyBarge(TestCase):
     def __init__(self):
+        super(ChanSpyBarge, self).__init__()
         self.last_step = ""
         self.passed = False
         self.numSpyEvents = 0
@@ -31,21 +36,18 @@
         self.talkDetected = 0
         self.test_to = 25
-        reactor.callWhenRunning(self.run)
-        self.agi = fastagi.FastAGIFactory(self.reportPassed)
-        reactor.listenTCP(4573, self.agi, self.test_to, '')
+        self.create_asterisk()
+        self.create_fastagi_factory()
         ast_conf_options = {
             "transmit_silence" : "yes",
-        self.ast1 = Asterisk(base=workingdir, ast_conf_options=ast_conf_options)
-        self.ast1.install_configs("%s/configs/ast1" % (testdir))
         self.talkingaudio = os.path.join(os.getcwd(), "%s/sounds/talking" % (testdir))
         self.audiofile1 = os.path.join(os.getcwd(), "%s/testaudio1" % (testdir))
-    def reportPassed(self, agi):
-        print "GOT PASS RESULTS!!!\n"
+    def fastagi_connect(self, agi):
+        LOGGER.info("GOT PASS RESULTS!!!")
         sequence = fastagi.InSequence()
         sequence.append(agi.execute, "HangUp")
@@ -55,101 +57,80 @@
     def readResult(self):
-        self.logLastStep("Reading results")
-        self.ast1.cli_exec("core show locks")   # get lock output in case of deadlock before tearing down.
-        self.ast1.cli_exec("core show channels")# if channels are still up for some reason, we want to know that as well
+        LOGGER.info("Reading results")
+        self.ast[0].cli_exec("core show locks")   # get lock output in case of deadlock before tearing down.
+        self.ast[0].cli_exec("core show channels")# if channels are still up for some reason, we want to know that as well
-        self.ast1.cli_exec("core show globals") # The global variables here hold failure conditions
+        self.ast[0].cli_exec("core show globals") # The global variables here hold failure conditions
         if self.passed == True:
-            print 'SIP ChanSpy test PASSED!'
+            LOGGER.info('SIP ChanSpy test PASSED!')
-            print 'SIP ChanSpy Test FAILED'
-    def stop_reactor(self):
-        def __finish_stop(result):
-            print "Stopping Reactor ..."
-            if reactor.running:
-                reactor.stop()
-            return result
-        df = self.ast1.stop()
-        df.addCallback(__finish_stop)
+            LOGGER.error('SIP ChanSpy Test FAILED')
     def chanspyEvent(self, ami, event):
-        self.logLastStep('Received ChanSpyStart event')
+        LOGGER.info('Received ChanSpyStart event')
         self.numSpyEvents += 1
         if event['spyeechannel'].count('end_a') > 0:
             reactor.callLater(3, self.aHangup)
             reactor.callLater(4, self.bHangup)
-    def amiOnConnect(self, ami):
-        self.logLastStep("Connected to the AMI")
+    def ami_connect(self, ami):
         self.ami = ami
         self.ami.registerEvent('ChanSpyStart', self.chanspyEvent)
         self.ami.setVar(channel = "", variable = "TESTAUDIO1", value = self.audiofile1)
         self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
-    def amiLoginError(self, ami):
-        self.logLastStep("AMI login failed")
-        reactor.callLater(1, self.readResult)
+        reactor.callLater(1, self.callChanSpy)
+        reactor.callLater(2, self.aCall)
-    def amiLogin(self):
-        self.logLastStep("Logging in to the AMI")
-        self.ami_factory = manager.AMIFactory("user", "mysecret")
-        self.ami_factory.login('', 5038).addCallbacks(self.amiOnConnect, self.amiLoginError)
     def aCall(self):
-        self.logLastStep("A Calling into Wait")
+        LOGGER.info("A Calling into Wait")
         self.pja.stdin.write("sip:play_exten at\n")
     def bCall(self):
-        self.logLastStep("B Calling into Playback")
+        LOGGER.info("B Calling into Playback")
         self.pjb.stdin.write("sip:play_exten at\n")
     def aHangup(self):
+        LOGGER.info("Hanging up A")
     def bHangup(self): #calls into chanspy extension and plays audio to A using Barge
+        LOGGER.info("Hanging up B")
         reactor.callLater(2, self.verifyAudio)
     def startProcesses(self):
-        self.logLastStep("Starting Processes")
+        LOGGER.info("Starting Processes")
         playfilearg = "--play-file=%s.wav" % (self.talkingaudio)
         self.pja = subprocess.Popen(['pjsua', '--local-port=5065', '--auto-answer=200', '--null-audio', '--auto-loop'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
         self.pjb = subprocess.Popen(['pjsua', '--local-port=5066', '--auto-answer=200', playfilearg, '--null-audio', '--auto-play'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
     def stopProcesses(self):
-        self.logLastStep("Stopping Processes")
+        LOGGER.info("Stopping Processes")
         os.kill(self.pja.pid, signal.SIGKILL)
         os.kill(self.pjb.pid, signal.SIGKILL)
-    def logLastStep(self, step):
-        print step
-        self.lastStep = step
     def callChanSpy(self):
-        self.logLastStep("Placing call to ChanSpy extension.")
-        self.ast1.cli_originate("SIP/end_b extension chanspytest at test")
+        LOGGER.info("Placing call to ChanSpy extension.")
+        self.ast[0].cli_originate("SIP/end_b extension chanspytest at test")
     def verifyAudio(self):
-        self.ast1.cli_originate("Local/play_recording at test extension detect_audio at test")
+        LOGGER.info("Verifying Audio")
+        self.ast[0].cli_originate("Local/play_recording at test extension detect_audio at test")
+    def stop_asterisk(self):
+        if not self.passed:
+            self.readResult()
     def run(self):
-        def __finish_start_ops(result):
-            # call extensions
-            self.amiLogin()
-            reactor.callLater(1, self.callChanSpy)
-            reactor.callLater(2, self.aCall)
-            # stop and read results after timeout
-            reactor.callLater(self.test_to, self.readResult)
-            return result
+        self.create_ami_factory()
-        df = self.ast1.start()
-        df.addCallback(__finish_start_ops)
 def main():
     test = ChanSpyBarge()

Modified: asterisk/trunk/tests/apps/mixmonitor/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/apps/mixmonitor/run-test?view=diff&rev=5037&r1=5036&r2=5037
--- asterisk/trunk/tests/apps/mixmonitor/run-test (original)
+++ asterisk/trunk/tests/apps/mixmonitor/run-test Thu May 15 11:28:55 2014
@@ -1,7 +1,8 @@
 #!/usr/bin/env python
-Copyright (C) 2010, Digium, Inc.
+Copyright (C) 2010-2014, Digium, Inc.
 David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
 This program is free software, distributed under the terms of
 the GNU General Public License Version 2.
@@ -10,17 +11,21 @@
 import sys
 import os
 import math
+import logging
 from twisted.internet import reactor
-from starpy import manager
-from asterisk.asterisk import Asterisk
+from test_case import TestCase
-workingdir = "apps/mixmonitor"
-testdir = "tests/%s" % workingdir
+testdir = "tests/apps/mixmonitor"
-class MixMonitorTest:
+LOGGER = logging.getLogger(__name__)
+class MixMonitorTest(TestCase):
     def __init__(self):
+        super(MixMonitorTest, self).__init__()
         self.passed = False
         self.last_step = ""
         # playback file is 2559 bytes of ulaw, that will come out to be ~41118 of wav
@@ -30,21 +35,18 @@
         self.audiofile2size = -1
         self.audiofile3size = -1
-        reactor.callWhenRunning(self.run)
-        self.ast1 = Asterisk(base=workingdir)
-        self.ast1.install_configs("%s/configs/ast1" % (testdir))
         self.audiofile1 = os.path.join(os.getcwd(), "%s/testaudio1" % (testdir))
         self.audiofile2 = os.path.join(os.getcwd(), "%s/testaudio2" % (testdir))
         self.audiofile3 = os.path.join(os.getcwd(), "%s/testaudio3" % (testdir))
         self.talkingaudio = os.path.join(os.getcwd(), "%s/sounds/talking" % (testdir))
+        self.create_asterisk()
     def read_result(self):
         self.passed = True
-        self.log_last_step("Reading result file from MixMonitor")
+        LOGGER.info("Reading result file from MixMonitor")
         if os.path.exists(self.audiofile1 + ".raw"):
             self.audiofile1size = os.path.getsize(self.audiofile1 + ".raw")
         if os.path.exists(self.audiofile2 + ".raw"):
@@ -52,54 +54,42 @@
         if os.path.exists(self.audiofile3 + ".raw"):
             self.audiofile3size = os.path.getsize(self.audiofile3 + ".raw")
-        self.log_last_step("audiofile1 size is %d, a negative size indicates the file was not present." % (self.audiofile1size, ))
+        LOGGER.debug("audiofile1 size is %d, a negative size indicates the file was not present." % (self.audiofile1size, ))
         if math.fabs(self.audiofile1size - self.expectedfilesize) > self.filesizetolerance:
             # if this failed mixmonitor is not creating the correct file size for the time we expect.
-            self.log_last_step("audiofile1 size is not within the size tolerance.")
+            LOGGER.error("audiofile1 size is not within the size tolerance.")
             self.passed = False
-        self.log_last_step("audiofile2 size is %d, a negative size indicates the file was not present." % (self.audiofile2size, ))
+        LOGGER.debug("audiofile2 size is %d, a negative size indicates the file was not present." % (self.audiofile2size, ))
         if math.fabs(self.audiofile2size - self.expectedfilesize) > self.filesizetolerance:
             # if this failed it is likely because StopMixMonitor never let go of audiofile1
-            self.log_last_step("audiofile2 size is not within the size tolerance.")
+            LOGGER.error("audiofile2 size is not within the size tolerance.")
             self.passed = False
-        self.log_last_step("audiofile3 size is %d, a negative size indicates the file was not present." % (self.audiofile3size, ))
+        LOGGER.debug("audiofile3 size is %d, a negative size indicates the file was not present." % (self.audiofile3size, ))
         if self.audiofile3size == -1:
             # if this failed it is likely because MixMonitor never let go of audiofile2 on hangup
-            self.log_last_step("audiofile3 file does not exist.")
+            LOGGER.error("audiofile3 file does not exist.")
             self.passed = False
         if self.passed == True:
-            self.log_last_step("Test Passed... All audio files are the correct.")
-    def stop_reactor(self):
-        def __finish_stop(result):
-            print "Stopping Reactor ..."
-            if reactor.running:
-                reactor.stop()
-            return result
-        df = self.ast1.stop()
-        df.addCallback(__finish_stop)
+            LOGGER.info("Test Passed... All audio files are the correct.")
     def launch_test1(self):
-        self.log_last_step("Placing call to test1 exten")
-        self.ast1.cli_originate("Local/s at listener extension s at test1")
+        LOGGER.info("Placing call to test1 exten")
+        self.ast[0].cli_originate("Local/s at listener extension s at test1")
     def launch_test2(self):
-        self.log_last_step("Placing call to test2 exten")
-        self.ast1.cli_originate("Local/s at listener extension s at test2")
+        LOGGER.info("Placing call to test2 exten")
+        self.ast[0].cli_originate("Local/s at listener extension s at test2")
-    def log_last_step(self, step):
-        print step
-        self.last_step = step
     def check_test1(self, ami, event):
-        self.log_last_step("Checking Userevent")
+        LOGGER.info("Checking Userevent")
         if event.get("userevent").lower() != "test1":
         status = event.get("status")
-        print ("Status of test1 is %s" % (status))
+        LOGGER.debug("Status of test1 is %s" % (status))
         if status != "SUCCESS":
@@ -108,19 +98,18 @@
     def check_test2(self, ami, event):
-        self.log_last_step("Checking Userevent")
+        LOGGER.info("Checking Userevent")
         if event.get("userevent").lower() != "test2":
         status = event.get("status")
-        print ("Status of test2 is %s" % (status))
+        LOGGER.debug("Status of test2 is %s" % (status))
         if status != "SUCCESS":
-    def ami_on_connect1(self, ami):
-        self.log_last_step("Connected to AMI 1")
+    def ami_connect(self, ami):
         self.ami = ami
         self.ami.registerEvent("UserEvent", self.check_test1)
@@ -129,26 +118,10 @@
         self.ami.setVar(channel = "", variable = "TESTAUDIO3", value = self.audiofile3)
         self.ami.setVar(channel = "", variable = "TALK_AUDIO", value = self.talkingaudio)
-    def ami_login_error(self, ami):
-        self.log_last_step("AMI login failed")
-        self.stop_reactor()
-    def ami_login(self):
-        self.log_last_step("Logging in to the AMI")
-        self.ami_factory1 = manager.AMIFactory("user", "mysecret")
-        self.ami_factory1.login('', 5038).addCallbacks(self.ami_on_connect1, self.ami_login_error)
+        self.launch_test1()
     def run(self):
-        def __finish_start_ops(result):
-            self.ami_login()
-            self.launch_test1()
-            reactor.callLater(30, self.stop_reactor)
-            return result
-        self.log_last_step("Starting Asterisk")
-        df = self.ast1.start()
-        df.addCallback(__finish_start_ops)
+        self.create_ami_factory()

More information about the asterisk-commits mailing list