[asterisk-commits] rmudgett: testsuite/asterisk/trunk r4939 - /asterisk/trunk/tests/masquerade/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Apr 4 18:44:54 CDT 2014
Author: rmudgett
Date: Fri Apr 4 18:44:48 2014
New Revision: 4939
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=4939
Log:
testsuite: Made PEP8 checker happy with the masquerade supertest.
Modified:
asterisk/trunk/tests/masquerade/run-test
Modified: asterisk/trunk/tests/masquerade/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/tests/masquerade/run-test?view=diff&rev=4939&r1=4938&r2=4939
==============================================================================
--- asterisk/trunk/tests/masquerade/run-test (original)
+++ asterisk/trunk/tests/masquerade/run-test Fri Apr 4 18:44:48 2014
@@ -15,7 +15,7 @@
from asterisk.test_case import TestCase
from asterisk.version import AsteriskVersion
-logger = logging.getLogger(__name__)
+LOGGER = logging.getLogger(__name__)
class MasqueradeTest(TestCase):
@@ -39,7 +39,7 @@
self.chain_length = 200
self.expected_space = 701
# Use IAX calls
- #self.use_sip_calls = False
+ # self.use_sip_calls = False
# Use SIP calls
self.use_sip_calls = True
@@ -61,6 +61,8 @@
self.create_asterisk(2)
def check_result(self):
+ """Examine the current result standing to determine when the next
+ phase of the test needs to be started."""
if not self.got_AMI_ParkedCall:
return
if not self.park_retrieved:
@@ -78,90 +80,100 @@
return
# Call chain completed successfully.
- logger.info("Test completed successfully")
+ LOGGER.info("Test completed successfully")
self.passed = True
self.stop_reactor()
- # Initiate test call chain.
def initiate_call_chain(self):
- logger.info("Initiating test call for a chain length of " + str(self.chain_length))
+ """Initiates the test call chain."""
+ LOGGER.info(
+ "Initiating test call for a chain length of "
+ + str(self.chain_length))
exten = self.base_exten + self.chain_length
if AsteriskVersion() < AsteriskVersion('12'):
- self.ami[0].originate(channel="Local/" + str(exten) + "@outgoing",
+ self.ami[0].originate(
+ channel="Local/" + str(exten) + "@outgoing",
context="parked", exten="parkme", priority=1,
- timeout=900,
- async=True).addErrback(TestCase.handle_originate_failure)
- return
- self.ami[0].originate(channel="Local/" + str(exten) + "@outgoing",
+ timeout=900, async=True
+ ).addErrback(TestCase.handle_originate_failure)
+ return
+ self.ami[0].originate(
+ channel="Local/" + str(exten) + "@outgoing",
context="parked", exten="parkme12", priority=1,
- timeout=900,
- async=True).addErrback(TestCase.handle_originate_failure)
-
- # Initiate a call to retrieve the parked call.
+ timeout=900, async=True
+ ).addErrback(TestCase.handle_originate_failure)
+
def get_parkedcall(self):
- logger.info("Fetching parked call at " + str(self.expected_space))
+ """Initiates a call to retrieve the parked call."""
+ LOGGER.info("Fetching parked call at " + str(self.expected_space))
tech_prefix = "IAX2/ast1/"
if self.use_sip_calls:
tech_prefix = "SIP/ast1/"
- self.ami[1].originate(channel=tech_prefix + str(self.expected_space),
+ self.ami[1].originate(
+ channel=tech_prefix + str(self.expected_space),
context="getit", exten="retrieve", priority=1,
- timeout=30,
- async=True).addErrback(TestCase.handle_originate_failure)
-
- # Called when got ParkedCall AMI event
+ timeout=30, async=True
+ ).addErrback(TestCase.handle_originate_failure)
+
def evt_parkedcall(self, ami, event):
+ """Called when got the ParkedCall AMI event"""
if self.test_complete:
return
exten = event.get(self.parking_space_hdr)
if not exten or exten != str(self.expected_space):
- logger.warning("Call not parked in expected space " + str(self.expected_space))
+ LOGGER.warning(
+ "Call not parked in expected space "
+ + str(self.expected_space))
self.stop_reactor()
return
- logger.info("Call parked at exten: " + str(exten))
+ LOGGER.info("Call parked at exten: " + str(exten))
self.got_AMI_ParkedCall = True
self.check_result()
- # Called when got UnParkedCall AMI event
def evt_unparkedcall(self, ami, event):
+ """Called when got the UnParkedCall AMI event"""
if self.test_complete:
return
exten = event.get(self.parking_space_hdr)
if not exten or exten != str(self.expected_space):
- logger.warning("Call not retrieved from expected space " + str(self.expected_space))
+ LOGGER.warning(
+ "Call not retrieved from expected space "
+ + str(self.expected_space))
self.stop_reactor()
return
- logger.info("Parked call retrieved from exten: " + str(exten))
+ LOGGER.info("Parked call retrieved from exten: " + str(exten))
self.got_AMI_UnParkedCall = True
self.check_result()
- # Called when got ParkedCallTimeOut AMI event
def evt_parkedcalltimeout(self, ami, event):
- if self.test_complete:
- return
- logger.warning("Call park timeout!")
+ """Called when got the ParkedCallTimeOut AMI event"""
+ if self.test_complete:
+ return
+ LOGGER.warning("Call park timeout!")
self.stop_reactor()
- # Called when got UserEvent event from either Asterisk instance
def evt_userevent(self, ami, event):
+ """Called when got the UserEvent event from either Asterisk instance"""
if self.test_complete:
return
# We want the AMI UserEvent header but the headers put
# in as dictionary keys are lowercased.
evt = event.get("userevent")
status = event.get("status")
- logger.info("UserEvent: " + str(evt) + " status: " + str(status))
+ LOGGER.info("UserEvent: " + str(evt) + " status: " + str(status))
if evt == "dialing":
self.dialed_count += 1
- logger.info("Calls dialed: " + str(self.dialed_count))
+ LOGGER.info("Calls dialed: " + str(self.dialed_count))
# Prod reactor timeout
self.reset_timeout()
return
elif evt == "optout":
self.removed_count += 1
- logger.info("Calls hungup or optimized out: " + str(self.removed_count))
+ LOGGER.info(
+ "Calls hungup or optimized out: " + str(self.removed_count))
# Prod reactor timeout
self.reset_timeout()
self.check_result()
@@ -174,9 +186,10 @@
elif status == "HANGUP":
if self.park_retrieved:
return
- logger.warning("The last call in the chain disconnected too early.")
+ LOGGER.warning(
+ "The last call in the chain disconnected too early.")
else:
- logger.info("Unknown status")
+ LOGGER.info("Unknown status")
elif evt == "ast1":
if status == "SUCCESS":
self.ast1_event = True
@@ -188,19 +201,21 @@
self.check_result()
return
else:
- logger.info("Unknown UserEvent")
- return
- logger.warning("UserEvent: " + str(evt) + " status: " + str(status))
+ LOGGER.info("Unknown UserEvent")
+ return
+ LOGGER.warning("UserEvent: " + str(evt) + " status: " + str(status))
self.stop_reactor()
- # This is called for each AMI connection established.
def ami_connect(self, ami):
+ """This is called for each AMI connection established."""
# Add AMI event triggers
if ami.id == 0:
# Ast1 events to handle
self.ami[ami.id].registerEvent("ParkedCall", self.evt_parkedcall)
- self.ami[ami.id].registerEvent("UnParkedCall", self.evt_unparkedcall)
- self.ami[ami.id].registerEvent("ParkedCallTimeOut", self.evt_parkedcalltimeout)
+ self.ami[ami.id].registerEvent(
+ "UnParkedCall", self.evt_unparkedcall)
+ self.ami[ami.id].registerEvent(
+ "ParkedCallTimeOut", self.evt_parkedcalltimeout)
self.ami[ami.id].registerEvent("UserEvent", self.evt_userevent)
elif ami.id == 1:
# Ast2 events to handle
@@ -211,26 +226,26 @@
if self.ami_count == 2:
self.initiate_call_chain()
- # This is called when the reactor has started running.
def run(self):
+ """This is called when the reactor has started running."""
TestCase.run(self)
self.create_ami_factory(2)
def send_core_show_locks(self):
"""Send a core show locks CLI command"""
- logger.debug('sending core show locks')
+ LOGGER.debug('sending core show locks')
cli_deferred = self.ast[0].cli_exec("core show locks")
cli_deferred.addCallbacks(self.send_core_show_locks_callback)
def send_core_show_locks_callback(self, cli_command):
"""Callback handler for the core show locks CLI command output"""
- logger.info("Output for core show locks:")
- logger.info(cli_command.output)
+ LOGGER.info("Output for core show locks:")
+ LOGGER.info(cli_command.output)
TestCase.stop_reactor(self)
return cli_command
- # This is called when we stop the reactor.
def stop_reactor(self):
+ """This is called when we stop the reactor."""
self.test_complete = True
# Try to interpret results to reduce stupid assumptions
@@ -238,31 +253,35 @@
if self.passed:
# Nothing to say about the test passing.
pass
- elif not self.got_AMI_ParkedCall and self.chain_length != self.dialed_count:
- logger.warning("Only dialed " + str(self.dialed_count) + " calls in chain.")
+ elif not self.got_AMI_ParkedCall \
+ and self.chain_length != self.dialed_count:
+ LOGGER.warning(
+ "Only dialed " + str(self.dialed_count) + " calls in chain.")
elif not self.got_AMI_ParkedCall:
- logger.warning("Call did not get parked.")
+ LOGGER.warning("Call did not get parked.")
elif self.chain_length != self.removed_count:
- logger.warning("Only optimized " + str(self.removed_count) + " calls in chain.")
+ LOGGER.warning(
+ "Only optimized " + str(self.removed_count)
+ + " calls in chain.")
elif not self.got_AMI_UnParkedCall:
- logger.warning("Call did not get retrieved from parking lot.")
+ LOGGER.warning("Call did not get retrieved from parking lot.")
elif not self.ast1_event or not self.ast2_event:
- logger.warning("DTMF handshake failed.")
+ LOGGER.warning("DTMF handshake failed.")
# Get some useful output from the Asterisk instance
# under test before tearing down in case of problems.
- #self.send_core_show_locks()
+ # self.send_core_show_locks()
# Comment this out when enabling "core show locks"
TestCase.stop_reactor(self)
def main():
- # Run Masquerade local channel chain Test
+ """Run Masquerade local channel chain Test"""
test = MasqueradeTest()
reactor.run()
if test.passed:
- logger.info("Test passed")
+ LOGGER.info("Test passed")
return 0
return 1
More information about the asterisk-commits
mailing list