[asterisk-commits] mjordan: testsuite/asterisk/trunk r5046 - in /asterisk/trunk/tests: feature_a...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu May 15 13:20:12 CDT 2014


Author: mjordan
Date: Thu May 15 13:20:09 2014
New Revision: 5046

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=5046
Log:
feature tests: Update to use TestCase

These tests failed as well due to not being based on TestCase. This patch
updates them just enough to get them running on top of TestCase.

Modified:
    asterisk/trunk/tests/feature_attended_transfer/run-test
    asterisk/trunk/tests/feature_blonde_transfer/run-test

Modified: asterisk/trunk/tests/feature_attended_transfer/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/feature_attended_transfer/run-test?view=diff&rev=5046&r1=5045&r2=5046
==============================================================================
--- asterisk/trunk/tests/feature_attended_transfer/run-test (original)
+++ asterisk/trunk/tests/feature_attended_transfer/run-test Thu May 15 13:20:09 2014
@@ -1,160 +1,141 @@
 #!/usr/bin/env python
-'''
-Copyright (C) 2010, Digium, Inc.
+"""
+Copyright (C) 2010-2014, Digium, Inc.
 David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
 
 This program is free software, distributed under the terms of
 the GNU General Public License Version 2.
-'''
+"""
 
 import sys
 import os
 import math
+import logging
+
 from twisted.application import service, internet
 from twisted.internet import reactor, defer
-from starpy import manager
 
 sys.path.append("lib/python")
-from asterisk.asterisk import Asterisk
+from asterisk.test_case import TestCase
 
-workingdir = "feature_attended_transfer"
-testdir = "tests/%s" % workingdir
+LOGGER = logging.getLogger(__name__)
 
-class BlondeTransferTest:
+class AttendedTransferTest(TestCase):
     def __init__(self):
+        super(AttendedTransferTest, self).__init__()
+
         self.passed = False
         self.alpha = False
         self.bravo = False
         self.charlie = False
         self.charlie_alpha = False
+        self.channels = []
 
         # Test timeout in seconds
         self.test_to = 45
-
-        reactor.callWhenRunning(self.run)
-
-        self.ast1 = Asterisk(base=workingdir)
-        self.ast1.install_configs("%s/configs/ast1" % (testdir))
-
-        self.ast2 = Asterisk(base=workingdir)
-        self.ast2.install_configs("%s/configs/ast2" % (testdir))
+        self.create_asterisk(count=2)
 
 
     def check_result(self):
-        self.log_last_step("Checking results...")
+        LOGGER.debug("Checking results...")
         if (self.alpha is True and self.bravo is True and self.charlie is True
                 and self.charlie_alpha is True):
+            LOGGER.info("Test passed")
+            for channel in self.channels:
+                LOGGER.debug("Hanging up %s" % channel)
+                self.ami[1].hangup(channel)
             self.stop_reactor()
             self.passed = True
 
-    def stop_reactor(self):
-        def __finish_stop(result):
-            print "Stopping Reactor ..."
-            if reactor.running:
-                reactor.stop()
-            return result
-        df = self.ast1.stop()
-        df.addCallback(__finish_stop)
-
     def launch_test(self):
-        self.ast2.cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
-
-    def log_last_step(self, step):
-        print step
-        self.last_step = step
+        LOGGER.info("Starting the test...")
+        self.ast[1].cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
 
     def check_alpha(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "alpha":
             return
+
         status = event.get("status")
-        print ("Status of alpha is %s" % (status))
+        LOGGER.debug("Status of alpha is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Alpha failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Alpha succeeded")
         self.alpha = True
         self.check_result()
 
     def check_bravo(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "bravo":
             return
+
         status = event.get("status")
-        print ("Status of bravo is %s" % (status))
+        LOGGER.debug("Status of bravo is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Bravo failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Bravo succeeded")
         self.bravo = True
         self.check_result()
 
     def check_charlie_alpha(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "charlie_alpha":
             return
         status = event.get("status")
-        print ("Status of charlie_alpha is %s" % (status))
+        LOGGER.debug("Status of charlie_alpha is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Charlie_alpha failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Charlie_alpha succeeded")
         self.charlie_alpha = True
         self.check_result()
 
     def check_charlie(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "charlie":
             return
         status = event.get("status")
-        print ("Status of charlie is %s" % (status))
+        LOGGER.debug("Status of charlie is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Charlie failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Charlie succeeded")
         self.charlie = True
         self.check_result()
 
-    def ami_on_connect2(self, ami):
-        self.log_last_step("Connected to AMI 2")
-        self.ami2 = ami
-        self.ami2.registerEvent("UserEvent", self.check_bravo)
+    def new_channel_handler(self, ami, event):
+        self.channels.append(event['channel'])
 
-    def ami_on_connect1(self, ami):
-        self.log_last_step("Connected to AMI 1")
-        self.ami1 = ami
-        self.ami1.registerEvent("UserEvent", self.check_alpha)
-        self.ami1.registerEvent("UserEvent", self.check_charlie)
-        self.ami1.registerEvent("UserEvent", self.check_charlie_alpha)
+    def hangup_handler(self, ami, event):
+        if event['channel'] in self.channels:
+            self.channels.remove(event['channel'])
 
-    def ami_login_error(self, ami):
-        self.log_last_step("AMI login failed")
-        self.stop_reactor()
+    def ami_connect(self, ami):
+        """AMI Connection Handler"""
 
-    def ami_login(self):
-        self.log_last_step("Logging in to the AMI")
-        self.ami_factory1 = manager.AMIFactory("user", "mysecret")
-        self.ami_factory1.login('127.0.0.1', 5038).addCallbacks(self.ami_on_connect1, self.ami_login_error)
-        self.ami_factory2 = manager.AMIFactory("user", "mysecret")
-        self.ami_factory2.login('127.0.0.2', 5038).addCallbacks(self.ami_on_connect2, self.ami_login_error)
+        if (ami.id == 0):
+            ami.registerEvent("UserEvent", self.check_alpha)
+            ami.registerEvent("UserEvent", self.check_charlie)
+            ami.registerEvent("UserEvent", self.check_charlie_alpha)
+        else:
+            ami.registerEvent("UserEvent", self.check_bravo)
+            ami.registerEvent('Newchannel', self.new_channel_handler)
+            ami.registerEvent('Hangup', self.hangup_handler)
+
+        if len(self.ami) == 2:
+            self.launch_test()
 
     def run(self):
-        def __finish_start_ops(result):
-            self.ami_login()
-            self.launch_test()
+        super(AttendedTransferTest, self).run()
 
-            # stop and read results after timeout
-            reactor.callLater(self.test_to, self.stop_reactor)
-
-        self.log_last_step("Starting Asterisk")
-        df1 = self.ast1.start()
-        df2 = self.ast2.start()
-        dl = defer.DeferredList([df1, df2])
-        dl.addCallback(__finish_start_ops)
+        self.create_ami_factory(count=2)
 
 def main():
-     # Run Blonde Transfer Test
-    test = BlondeTransferTest()
+
+    test = AttendedTransferTest()
     reactor.run()
     if test.passed != True:
         return 1

Modified: asterisk/trunk/tests/feature_blonde_transfer/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/feature_blonde_transfer/run-test?view=diff&rev=5046&r1=5045&r2=5046
==============================================================================
--- asterisk/trunk/tests/feature_blonde_transfer/run-test (original)
+++ asterisk/trunk/tests/feature_blonde_transfer/run-test Thu May 15 13:20:09 2014
@@ -1,7 +1,8 @@
 #!/usr/bin/env python
 '''
-Copyright (C) 2010, Digium, Inc.
+Copyright (C) 2010-2014, Digium, Inc.
 David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
 
 This program is free software, distributed under the terms of
 the GNU General Public License Version 2.
@@ -10,131 +11,107 @@
 import sys
 import os
 import math
+import logging
+
 from twisted.application import service, internet
 from twisted.internet import reactor, defer
-from starpy import manager
 
 sys.path.append("lib/python")
-from asterisk.asterisk import Asterisk
+from asterisk.test_case import TestCase
 
-workingdir = "feature_blonde_transfer"
-testdir = "tests/%s" % workingdir
+LOGGER = logging.getLogger(__name__)
 
-class BlondeTransferTest:
+class BlondeTransferTest(TestCase):
     def __init__(self):
+        super(BlondeTransferTest, self).__init__()
         self.passed = False
         self.alpha = False
         self.bravo = False
         self.charlie = False
 
-        # Test timeout in seconds
-        self.test_to = 45
-
-        reactor.callWhenRunning(self.run)
-
-        self.ast1 = Asterisk(base=workingdir)
-        self.ast1.install_configs("%s/configs/ast1" % (testdir))
-
-        self.ast2 = Asterisk(base=workingdir)
-        self.ast2.install_configs("%s/configs/ast2" % (testdir))
-
+        self.channels = []
+        self.create_asterisk(count=2)
 
     def check_result(self):
-        self.log_last_step("Checking results...")
+        LOGGER.debug("Checking results...")
         if self.alpha is True and self.bravo is True and self.charlie is True:
+            LOGGER.info("Test passed")
+            for channel in self.channels:
+                LOGGER.debug("Hanging up %s" % channel)
+                self.ami[1].hangup(channel)
             self.stop_reactor()
             self.passed = True
 
-    def stop_reactor(self):
-        def __finish_stop(result):
-            print "Stopping Reactor ..."
-            if reactor.running:
-                reactor.stop()
-            return result
-        df = self.ast1.stop()
-        df.addCallback(__finish_stop)
-
     def launch_test(self):
-        self.ast2.cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
-
-    def log_last_step(self, step):
-        print step
-        self.last_step = step
+        LOGGER.info("Starting test...")
+        self.ast[1].cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
 
     def check_alpha(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "alpha":
             return
+
         status = event.get("status")
-        print ("Status of alpha is %s" % (status))
+        LOGGER.debug("Status of alpha is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Alpha failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Alpha succeeded")
         self.alpha = True
         self.check_result()
 
     def check_bravo(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "bravo":
             return
+
         status = event.get("status")
-        print ("Status of bravo is %s" % (status))
+        LOGGER.debug("Status of bravo is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Bravo failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Bravo succeeded")
         self.bravo = True
         self.check_result()
 
     def check_charlie(self, ami, event):
-        self.log_last_step("Checking Userevent")
         if event.get("userevent").lower() != "charlie":
             return
         status = event.get("status")
-        print ("Status of charlie is %s" % (status))
+        LOGGER.debug("Status of charlie is %s" % (status))
         if status != "SUCCESS":
+            LOGGER.warn("Charlie failed")
             self.stop_reactor()
             return
-
+        LOGGER.info("Charlie succeeded")
         self.charlie = True
         self.check_result()
 
-    def ami_on_connect2(self, ami):
-        self.log_last_step("Connected to AMI 2")
-        self.ami2 = ami
-        self.ami2.registerEvent("UserEvent", self.check_bravo)
+    def new_channel_handler(self, ami, event):
+        self.channels.append(event['channel'])
 
-    def ami_on_connect1(self, ami):
-        self.log_last_step("Connected to AMI 1")
-        self.ami1 = ami
-        self.ami1.registerEvent("UserEvent", self.check_alpha)
-        self.ami1.registerEvent("UserEvent", self.check_charlie)
+    def hangup_handler(self, ami, event):
+        if event['channel'] in self.channels:
+            self.channels.remove(event['channel'])
 
-    def ami_login_error(self, ami):
-        self.log_last_step("AMI login failed")
-        self.stop_reactor()
+    def ami_connect(self, ami):
+        """AMI Connection Handler"""
 
-    def ami_login(self):
-        self.log_last_step("Logging in to the AMI")
-        self.ami_factory1 = manager.AMIFactory("user", "mysecret")
-        self.ami_factory1.login('127.0.0.1', 5038).addCallbacks(self.ami_on_connect1, self.ami_login_error)
-        self.ami_factory2 = manager.AMIFactory("user", "mysecret")
-        self.ami_factory2.login('127.0.0.2', 5038).addCallbacks(self.ami_on_connect2, self.ami_login_error)
+        if (ami.id == 0):
+            ami.registerEvent("UserEvent", self.check_alpha)
+            ami.registerEvent("UserEvent", self.check_charlie)
+        else:
+            ami.registerEvent("UserEvent", self.check_bravo)
+            ami.registerEvent('Newchannel', self.new_channel_handler)
+            ami.registerEvent('Hangup', self.hangup_handler)
+
+        if len(self.ami) == 2:
+            self.launch_test()
 
     def run(self):
-        def __finish_start_ops(result):
-            self.ami_login()
-            self.launch_test()
+        super(BlondeTransferTest, self).run()
+        self.create_ami_factory(count=2)
 
-            # stop and read results after timeout
-            reactor.callLater(self.test_to, self.stop_reactor)
-
-        self.log_last_step("Starting Asterisk")
-        df1 = self.ast1.start()
-        df2 = self.ast2.start()
-        dl = defer.DeferredList([df1, df2])
-        dl.addCallback(__finish_start_ops)
 
 def main():
      # Run Blonde Transfer Test




More information about the asterisk-commits mailing list