[asterisk-commits] mjordan: testsuite/asterisk/trunk r5046 - in /asterisk/trunk/tests: feature_a...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu May 15 13:20:12 CDT 2014
Author: mjordan
Date: Thu May 15 13:20:09 2014
New Revision: 5046
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=5046
Log:
feature tests: Update to use TestCase
These tests failed as well due to not being based on TestCase. This patch
updates them just enough to get them running on top of TestCase.
Modified:
asterisk/trunk/tests/feature_attended_transfer/run-test
asterisk/trunk/tests/feature_blonde_transfer/run-test
Modified: asterisk/trunk/tests/feature_attended_transfer/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/feature_attended_transfer/run-test?view=diff&rev=5046&r1=5045&r2=5046
==============================================================================
--- asterisk/trunk/tests/feature_attended_transfer/run-test (original)
+++ asterisk/trunk/tests/feature_attended_transfer/run-test Thu May 15 13:20:09 2014
@@ -1,160 +1,141 @@
#!/usr/bin/env python
-'''
-Copyright (C) 2010, Digium, Inc.
+"""
+Copyright (C) 2010-2014, Digium, Inc.
David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
This program is free software, distributed under the terms of
the GNU General Public License Version 2.
-'''
+"""
import sys
import os
import math
+import logging
+
from twisted.application import service, internet
from twisted.internet import reactor, defer
-from starpy import manager
sys.path.append("lib/python")
-from asterisk.asterisk import Asterisk
+from asterisk.test_case import TestCase
-workingdir = "feature_attended_transfer"
-testdir = "tests/%s" % workingdir
+LOGGER = logging.getLogger(__name__)
-class BlondeTransferTest:
+class AttendedTransferTest(TestCase):
def __init__(self):
+ super(AttendedTransferTest, self).__init__()
+
self.passed = False
self.alpha = False
self.bravo = False
self.charlie = False
self.charlie_alpha = False
+ self.channels = []
# Test timeout in seconds
self.test_to = 45
-
- reactor.callWhenRunning(self.run)
-
- self.ast1 = Asterisk(base=workingdir)
- self.ast1.install_configs("%s/configs/ast1" % (testdir))
-
- self.ast2 = Asterisk(base=workingdir)
- self.ast2.install_configs("%s/configs/ast2" % (testdir))
+ self.create_asterisk(count=2)
def check_result(self):
- self.log_last_step("Checking results...")
+ LOGGER.debug("Checking results...")
if (self.alpha is True and self.bravo is True and self.charlie is True
and self.charlie_alpha is True):
+ LOGGER.info("Test passed")
+ for channel in self.channels:
+ LOGGER.debug("Hanging up %s" % channel)
+ self.ami[1].hangup(channel)
self.stop_reactor()
self.passed = True
- def stop_reactor(self):
- def __finish_stop(result):
- print "Stopping Reactor ..."
- if reactor.running:
- reactor.stop()
- return result
- df = self.ast1.stop()
- df.addCallback(__finish_stop)
-
def launch_test(self):
- self.ast2.cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
-
- def log_last_step(self, step):
- print step
- self.last_step = step
+ LOGGER.info("Starting the test...")
+ self.ast[1].cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
def check_alpha(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "alpha":
return
+
status = event.get("status")
- print ("Status of alpha is %s" % (status))
+ LOGGER.debug("Status of alpha is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Alpha failed")
self.stop_reactor()
return
-
+ LOGGER.info("Alpha succeeded")
self.alpha = True
self.check_result()
def check_bravo(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "bravo":
return
+
status = event.get("status")
- print ("Status of bravo is %s" % (status))
+ LOGGER.debug("Status of bravo is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Bravo failed")
self.stop_reactor()
return
-
+ LOGGER.info("Bravo succeeded")
self.bravo = True
self.check_result()
def check_charlie_alpha(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "charlie_alpha":
return
status = event.get("status")
- print ("Status of charlie_alpha is %s" % (status))
+ LOGGER.debug("Status of charlie_alpha is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Charlie_alpha failed")
self.stop_reactor()
return
-
+ LOGGER.info("Charlie_alpha succeeded")
self.charlie_alpha = True
self.check_result()
def check_charlie(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "charlie":
return
status = event.get("status")
- print ("Status of charlie is %s" % (status))
+ LOGGER.debug("Status of charlie is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Charlie failed")
self.stop_reactor()
return
-
+ LOGGER.info("Charlie succeeded")
self.charlie = True
self.check_result()
- def ami_on_connect2(self, ami):
- self.log_last_step("Connected to AMI 2")
- self.ami2 = ami
- self.ami2.registerEvent("UserEvent", self.check_bravo)
+ def new_channel_handler(self, ami, event):
+ self.channels.append(event['channel'])
- def ami_on_connect1(self, ami):
- self.log_last_step("Connected to AMI 1")
- self.ami1 = ami
- self.ami1.registerEvent("UserEvent", self.check_alpha)
- self.ami1.registerEvent("UserEvent", self.check_charlie)
- self.ami1.registerEvent("UserEvent", self.check_charlie_alpha)
+ def hangup_handler(self, ami, event):
+ if event['channel'] in self.channels:
+ self.channels.remove(event['channel'])
- def ami_login_error(self, ami):
- self.log_last_step("AMI login failed")
- self.stop_reactor()
+ def ami_connect(self, ami):
+ """AMI Connection Handler"""
- def ami_login(self):
- self.log_last_step("Logging in to the AMI")
- self.ami_factory1 = manager.AMIFactory("user", "mysecret")
- self.ami_factory1.login('127.0.0.1', 5038).addCallbacks(self.ami_on_connect1, self.ami_login_error)
- self.ami_factory2 = manager.AMIFactory("user", "mysecret")
- self.ami_factory2.login('127.0.0.2', 5038).addCallbacks(self.ami_on_connect2, self.ami_login_error)
+ if (ami.id == 0):
+ ami.registerEvent("UserEvent", self.check_alpha)
+ ami.registerEvent("UserEvent", self.check_charlie)
+ ami.registerEvent("UserEvent", self.check_charlie_alpha)
+ else:
+ ami.registerEvent("UserEvent", self.check_bravo)
+ ami.registerEvent('Newchannel', self.new_channel_handler)
+ ami.registerEvent('Hangup', self.hangup_handler)
+
+ if len(self.ami) == 2:
+ self.launch_test()
def run(self):
- def __finish_start_ops(result):
- self.ami_login()
- self.launch_test()
+ super(AttendedTransferTest, self).run()
- # stop and read results after timeout
- reactor.callLater(self.test_to, self.stop_reactor)
-
- self.log_last_step("Starting Asterisk")
- df1 = self.ast1.start()
- df2 = self.ast2.start()
- dl = defer.DeferredList([df1, df2])
- dl.addCallback(__finish_start_ops)
+ self.create_ami_factory(count=2)
def main():
- # Run Blonde Transfer Test
- test = BlondeTransferTest()
+
+ test = AttendedTransferTest()
reactor.run()
if test.passed != True:
return 1
Modified: asterisk/trunk/tests/feature_blonde_transfer/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/feature_blonde_transfer/run-test?view=diff&rev=5046&r1=5045&r2=5046
==============================================================================
--- asterisk/trunk/tests/feature_blonde_transfer/run-test (original)
+++ asterisk/trunk/tests/feature_blonde_transfer/run-test Thu May 15 13:20:09 2014
@@ -1,7 +1,8 @@
#!/usr/bin/env python
'''
-Copyright (C) 2010, Digium, Inc.
+Copyright (C) 2010-2014, Digium, Inc.
David Vossel <dvossel at digium.com>
+Matt Jordan <mjordan at digium.com>
This program is free software, distributed under the terms of
the GNU General Public License Version 2.
@@ -10,131 +11,107 @@
import sys
import os
import math
+import logging
+
from twisted.application import service, internet
from twisted.internet import reactor, defer
-from starpy import manager
sys.path.append("lib/python")
-from asterisk.asterisk import Asterisk
+from asterisk.test_case import TestCase
-workingdir = "feature_blonde_transfer"
-testdir = "tests/%s" % workingdir
+LOGGER = logging.getLogger(__name__)
-class BlondeTransferTest:
+class BlondeTransferTest(TestCase):
def __init__(self):
+ super(BlondeTransferTest, self).__init__()
self.passed = False
self.alpha = False
self.bravo = False
self.charlie = False
- # Test timeout in seconds
- self.test_to = 45
-
- reactor.callWhenRunning(self.run)
-
- self.ast1 = Asterisk(base=workingdir)
- self.ast1.install_configs("%s/configs/ast1" % (testdir))
-
- self.ast2 = Asterisk(base=workingdir)
- self.ast2.install_configs("%s/configs/ast2" % (testdir))
-
+ self.channels = []
+ self.create_asterisk(count=2)
def check_result(self):
- self.log_last_step("Checking results...")
+ LOGGER.debug("Checking results...")
if self.alpha is True and self.bravo is True and self.charlie is True:
+ LOGGER.info("Test passed")
+ for channel in self.channels:
+ LOGGER.debug("Hanging up %s" % channel)
+ self.ami[1].hangup(channel)
self.stop_reactor()
self.passed = True
- def stop_reactor(self):
- def __finish_stop(result):
- print "Stopping Reactor ..."
- if reactor.running:
- reactor.stop()
- return result
- df = self.ast1.stop()
- df.addCallback(__finish_stop)
-
def launch_test(self):
- self.ast2.cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
-
- def log_last_step(self, step):
- print step
- self.last_step = step
+ LOGGER.info("Starting test...")
+ self.ast[1].cli_originate("Local/1000 at transfertest extension a_dial at transfertest")
def check_alpha(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "alpha":
return
+
status = event.get("status")
- print ("Status of alpha is %s" % (status))
+ LOGGER.debug("Status of alpha is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Alpha failed")
self.stop_reactor()
return
-
+ LOGGER.info("Alpha succeeded")
self.alpha = True
self.check_result()
def check_bravo(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "bravo":
return
+
status = event.get("status")
- print ("Status of bravo is %s" % (status))
+ LOGGER.debug("Status of bravo is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Bravo failed")
self.stop_reactor()
return
-
+ LOGGER.info("Bravo succeeded")
self.bravo = True
self.check_result()
def check_charlie(self, ami, event):
- self.log_last_step("Checking Userevent")
if event.get("userevent").lower() != "charlie":
return
status = event.get("status")
- print ("Status of charlie is %s" % (status))
+ LOGGER.debug("Status of charlie is %s" % (status))
if status != "SUCCESS":
+ LOGGER.warn("Charlie failed")
self.stop_reactor()
return
-
+ LOGGER.info("Charlie succeeded")
self.charlie = True
self.check_result()
- def ami_on_connect2(self, ami):
- self.log_last_step("Connected to AMI 2")
- self.ami2 = ami
- self.ami2.registerEvent("UserEvent", self.check_bravo)
+ def new_channel_handler(self, ami, event):
+ self.channels.append(event['channel'])
- def ami_on_connect1(self, ami):
- self.log_last_step("Connected to AMI 1")
- self.ami1 = ami
- self.ami1.registerEvent("UserEvent", self.check_alpha)
- self.ami1.registerEvent("UserEvent", self.check_charlie)
+ def hangup_handler(self, ami, event):
+ if event['channel'] in self.channels:
+ self.channels.remove(event['channel'])
- def ami_login_error(self, ami):
- self.log_last_step("AMI login failed")
- self.stop_reactor()
+ def ami_connect(self, ami):
+ """AMI Connection Handler"""
- def ami_login(self):
- self.log_last_step("Logging in to the AMI")
- self.ami_factory1 = manager.AMIFactory("user", "mysecret")
- self.ami_factory1.login('127.0.0.1', 5038).addCallbacks(self.ami_on_connect1, self.ami_login_error)
- self.ami_factory2 = manager.AMIFactory("user", "mysecret")
- self.ami_factory2.login('127.0.0.2', 5038).addCallbacks(self.ami_on_connect2, self.ami_login_error)
+ if (ami.id == 0):
+ ami.registerEvent("UserEvent", self.check_alpha)
+ ami.registerEvent("UserEvent", self.check_charlie)
+ else:
+ ami.registerEvent("UserEvent", self.check_bravo)
+ ami.registerEvent('Newchannel', self.new_channel_handler)
+ ami.registerEvent('Hangup', self.hangup_handler)
+
+ if len(self.ami) == 2:
+ self.launch_test()
def run(self):
- def __finish_start_ops(result):
- self.ami_login()
- self.launch_test()
+ super(BlondeTransferTest, self).run()
+ self.create_ami_factory(count=2)
- # stop and read results after timeout
- reactor.callLater(self.test_to, self.stop_reactor)
-
- self.log_last_step("Starting Asterisk")
- df1 = self.ast1.start()
- df2 = self.ast2.start()
- dl = defer.DeferredList([df1, df2])
- dl.addCallback(__finish_start_ops)
def main():
# Run Blonde Transfer Test
More information about the asterisk-commits
mailing list