[svn-commits] kmoore: testsuite/asterisk/trunk r5416 - /asterisk/trunk/lib/python/asterisk/
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Fri Aug 8 20:30:35 CDT 2014
Author: kmoore
Date: Fri Aug 8 20:30:33 2014
New Revision: 5416
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=5416
Log:
PEP8ify test_case.py
Modified:
asterisk/trunk/lib/python/asterisk/test_case.py
Modified: asterisk/trunk/lib/python/asterisk/test_case.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/test_case.py?view=diff&rev=5416&r1=5415&r2=5416
==============================================================================
--- asterisk/trunk/lib/python/asterisk/test_case.py (original)
+++ asterisk/trunk/lib/python/asterisk/test_case.py Fri Aug 8 20:30:33 2014
@@ -15,7 +15,7 @@
import uuid
from datetime import datetime
from hashlib import md5
-from twisted.internet import reactor, defer
+from twisted.internet import reactor, defer, error as twisted_error
from twisted.python import log
from starpy import manager, fastagi
@@ -58,19 +58,20 @@
LOGGER = logging.getLogger(__name__)
- fmt = '[%(asctime)s] %(levelname)s[%(process)d]: %(name)s:%(lineno)d %(funcName)s: %(message)s'
+ fmt = '[%(asctime)s] %(levelname)s[%(process)d]: %(name)s:%(lineno)d '\
+ '%(funcName)s: %(message)s'
datefmt = '%b %d %H:%M:%S'
form = logging.Formatter(fmt=fmt, datefmt=datefmt)
- fh = logging.FileHandler(os.path.join(log_dir, 'full.txt'))
- fh.setLevel(logging.DEBUG)
- fh.setFormatter(form)
- root_logger.addHandler(fh)
-
- mh = logging.FileHandler(os.path.join(log_dir, 'messages.txt'))
- mh.setLevel(logging.INFO)
- mh.setFormatter(form)
- root_logger.addHandler(mh)
+ full_handler = logging.FileHandler(os.path.join(log_dir, 'full.txt'))
+ full_handler.setLevel(logging.DEBUG)
+ full_handler.setFormatter(form)
+ root_logger.addHandler(full_handler)
+
+ messages_handler = logging.FileHandler(os.path.join(log_dir, 'messages.txt'))
+ messages_handler.setLevel(logging.INFO)
+ messages_handler.setFormatter(form)
+ root_logger.addHandler(messages_handler)
class TestCase(object):
@@ -192,25 +193,24 @@
# Get those global conditions that are not in the self conditions
for g_cond in global_conditions:
disallowed = [i for i in conditions \
- if (i[0].get_name() == g_cond[0].get_name() and
- i[1] == g_cond[1])]
- if (len(disallowed) == 0):
+ if i[0].get_name() == g_cond[0].get_name() \
+ and i[1] == g_cond[1]]
+ if len(disallowed) == 0:
conditions.append(g_cond)
for cond in conditions:
# cond is a 3-tuple of object, pre-post type, and related name
obj, pre_post_type, related_name = cond
- if (pre_post_type == "PRE"):
- self.condition_controller.register_pre_test_condition(obj)
- elif (pre_post_type == "POST"):
- self.condition_controller.register_post_test_condition(obj,
- related_name)
+ if pre_post_type == "PRE":
+ self.condition_controller.register_pre_test_condition(obj)
+ elif pre_post_type == "POST":
+ self.condition_controller.register_post_test_condition(\
+ obj, related_name)
else:
msg = "Unknown condition type [%s]" % pre_post_type
LOGGER.warning(msg)
- self.condition_controller.register_observer(
- self.handle_condition_failure,
- 'Failed')
+ self.condition_controller.register_observer(\
+ self.handle_condition_failure, 'Failed')
def create_asterisk(self, count=1, base_configs_path=None):
"""Create n instances of Asterisk
@@ -241,7 +241,7 @@
# Copy test specific config files
self.ast[i].install_configs("%s/configs/ast%d" %
(self.test_name, num),
- self.test_config.get_deps())
+ self.test_config.get_deps())
def create_ami_factory(self, count=1, username="user", secret="mysecret",
port=5038):
@@ -356,9 +356,9 @@
# Gather up the deferred objects from each of the instances of Asterisk
# and wait until all are finished before proceeding
start_defers = []
- for index, item in enumerate(self.ast):
+ for index, ast in enumerate(self.ast):
LOGGER.info("Starting Asterisk instance %d" % (index + 1))
- temp_defer = self.ast[index].start(self.test_config.get_deps())
+ temp_defer = ast.start(self.test_config.get_deps())
start_defers.append(temp_defer)
deferred = defer.DeferredList(start_defers, consumeErrors=True)
@@ -399,9 +399,9 @@
# Gather up the stopped defers; check success failure of stopping
# when all instances of Asterisk have stopped
stop_defers = []
- for index, item in enumerate(self.ast):
+ for index, ast in enumerate(self.ast):
LOGGER.info("Stopping Asterisk instance %d" % (index + 1))
- temp_defer = self.ast[index].stop()
+ temp_defer = ast.stop()
stop_defers.append(temp_defer)
defer.DeferredList(stop_defers).addCallback(
@@ -425,7 +425,7 @@
if reactor.running:
try:
reactor.stop()
- except twisted.internet.error.ReactorNotRunning:
+ except twisted_error.ReactorNotRunning:
# Something stopped it between our checks - at least we're
# stopped
pass
@@ -454,7 +454,7 @@
needs to first ensure that Asterisk is fully up and running before
moving on.
"""
- if (self.ast):
+ if self.ast:
self._start_asterisk()
else:
# If no instances of Asterisk are needed, go ahead and just run
@@ -468,7 +468,7 @@
Derived classes must call this implementation, as this method provides a
fail out mechanism in case the test hangs.
"""
- if (self.reactor_timeout > 0):
+ if self.reactor_timeout > 0:
self.timeout_id = reactor.callLater(self.reactor_timeout,
self._reactor_timeout)
@@ -518,7 +518,7 @@
Convenience callback handler for twisted deferred errors for an AMI
originate call. Derived classes can choose to add this handler to
- originate calls in order to handle them safely when they fail.
+ originate calls in order to handle them safely when they fail.
This will stop the test if called.
Keyword arguments:
@@ -531,7 +531,7 @@
def reset_timeout(self):
"""Resets the reactor timeout"""
- if (self.timeout_id != None):
+ if self.timeout_id != None:
original_time = datetime.fromtimestamp(self.timeout_id.getTime())
self.timeout_id.reset(self.reactor_timeout)
new_time = datetime.fromtimestamp(self.timeout_id.getTime())
@@ -627,8 +627,8 @@
fail_token A previously created fail token to be removed from the test
"""
if not fail_token in self.fail_tokens:
- LOGGER.warning('Attempted to remove an unknown fail token: %s'
- % fail_token['message'])
+ LOGGER.warning('Attempted to remove an unknown fail token: %s',
+ fail_token['message'])
self.passed = False
return
self.fail_tokens.remove(fail_token)
@@ -656,7 +656,7 @@
default_application = 'Echo'
- def __init__(self, test_path = '', test_config = None):
+ def __init__(self, test_path='', test_config=None):
"""Constructor
Parameters:
@@ -691,7 +691,8 @@
if 'expected_events' in test_config:
self.expected_events = test_config['expected_events']
if 'ignore-originate-failures' in test_config:
- self._ignore_originate_failures = test_config['ignore-originate-failures']
+ self._ignore_originate_failures =\
+ test_config['ignore-originate-failures']
if 'spawn-after-hangup' in test_config:
self._spawn_after_hangup = test_config['spawn-after-hangup']
if 'config-path' in test_config:
@@ -741,26 +742,28 @@
call_details['otherchannelid'] = None
if 'application' in call_details:
msg += " with application %s" % call_details['application']
- deferred = ami.originate(channel=call_details['channel'],
- application=call_details['application'],
- variable=call_details['variable'],
- account=call_details['account'],
- async=call_details['async'],
- channelid=call_details['channelid'],
- otherchannelid=call_details['otherchannelid'])
+ deferred = ami.originate(\
+ channel=call_details['channel'],
+ application=call_details['application'],
+ variable=call_details['variable'],
+ account=call_details['account'],
+ async=call_details['async'],
+ channelid=call_details['channelid'],
+ otherchannelid=call_details['otherchannelid'])
else:
msg += " to %s@%s at %s" % (call_details['exten'],
call_details['context'],
call_details['priority'],)
- deferred = ami.originate(channel=call_details['channel'],
- context=call_details['context'],
- exten=call_details['exten'],
- priority=call_details['priority'],
- variable=call_details['variable'],
- account=call_details['account'],
- async=call_details['async'],
- channelid=call_details['channelid'],
- otherchannelid=call_details['otherchannelid'])
+ deferred = ami.originate(\
+ channel=call_details['channel'],
+ context=call_details['context'],
+ exten=call_details['exten'],
+ priority=call_details['priority'],
+ variable=call_details['variable'],
+ account=call_details['account'],
+ async=call_details['async'],
+ channelid=call_details['channelid'],
+ otherchannelid=call_details['otherchannelid'])
if self._ignore_originate_failures:
deferred.addErrback(__swallow_originate_error)
else:
@@ -773,7 +776,7 @@
name that Asterisk created with the call we just originated
"""
- if (event['variable'] == 'testuniqueid'):
+ if event['variable'] == 'testuniqueid':
if (len([chan for chan in self._tracking_channels if
chan['testuniqueid'] == event['value']])):
@@ -801,10 +804,10 @@
candidate_channel = [chan for chan in self._tracking_channels
if chan['channel'] in event['channel']]
- if (len(candidate_channel)):
+ if len(candidate_channel):
LOGGER.debug("Channel %s hung up; removing" % event['channel'])
self._tracking_channels.remove(candidate_channel[0])
- if (self._spawn_after_hangup):
+ if self._spawn_after_hangup:
self._current_run += 1
self.__start_new_call(ami)
@@ -814,7 +817,7 @@
stop the test
"""
- if (self._current_run < len(self._test_runs)):
+ if self._current_run < len(self._test_runs):
self.__originate_call(ami, self._test_runs[self._current_run])
else:
LOGGER.info("All calls executed, stopping")
More information about the svn-commits
mailing list