[svn-commits] kmoore: testsuite/asterisk/trunk r5416 - /asterisk/trunk/lib/python/asterisk/

SVN commits to the Digium repositories svn-commits at lists.digium.com
Fri Aug 8 20:30:35 CDT 2014


Author: kmoore
Date: Fri Aug  8 20:30:33 2014
New Revision: 5416

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=5416
Log:
PEP8ify test_case.py

Modified:
    asterisk/trunk/lib/python/asterisk/test_case.py

Modified: asterisk/trunk/lib/python/asterisk/test_case.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/trunk/lib/python/asterisk/test_case.py?view=diff&rev=5416&r1=5415&r2=5416
==============================================================================
--- asterisk/trunk/lib/python/asterisk/test_case.py (original)
+++ asterisk/trunk/lib/python/asterisk/test_case.py Fri Aug  8 20:30:33 2014
@@ -15,7 +15,7 @@
 import uuid
 from datetime import datetime
 from hashlib import md5
-from twisted.internet import reactor, defer
+from twisted.internet import reactor, defer, error as twisted_error
 from twisted.python import log
 from starpy import manager, fastagi
 
@@ -58,19 +58,20 @@
 
     LOGGER = logging.getLogger(__name__)
 
-    fmt = '[%(asctime)s] %(levelname)s[%(process)d]: %(name)s:%(lineno)d %(funcName)s: %(message)s'
+    fmt = '[%(asctime)s] %(levelname)s[%(process)d]: %(name)s:%(lineno)d '\
+        '%(funcName)s: %(message)s'
     datefmt = '%b %d %H:%M:%S'
     form = logging.Formatter(fmt=fmt, datefmt=datefmt)
 
-    fh = logging.FileHandler(os.path.join(log_dir, 'full.txt'))
-    fh.setLevel(logging.DEBUG)
-    fh.setFormatter(form)
-    root_logger.addHandler(fh)
-
-    mh = logging.FileHandler(os.path.join(log_dir, 'messages.txt'))
-    mh.setLevel(logging.INFO)
-    mh.setFormatter(form)
-    root_logger.addHandler(mh)
+    full_handler = logging.FileHandler(os.path.join(log_dir, 'full.txt'))
+    full_handler.setLevel(logging.DEBUG)
+    full_handler.setFormatter(form)
+    root_logger.addHandler(full_handler)
+
+    messages_handler = logging.FileHandler(os.path.join(log_dir, 'messages.txt'))
+    messages_handler.setLevel(logging.INFO)
+    messages_handler.setFormatter(form)
+    root_logger.addHandler(messages_handler)
 
 
 class TestCase(object):
@@ -192,25 +193,24 @@
         # Get those global conditions that are not in the self conditions
         for g_cond in global_conditions:
             disallowed = [i for i in conditions \
-                          if (i[0].get_name() == g_cond[0].get_name() and
-                              i[1] == g_cond[1])]
-            if (len(disallowed) == 0):
+                if i[0].get_name() == g_cond[0].get_name() \
+                and i[1] == g_cond[1]]
+            if len(disallowed) == 0:
                 conditions.append(g_cond)
 
         for cond in conditions:
             # cond is a 3-tuple of object, pre-post type, and related name
             obj, pre_post_type, related_name = cond
-            if (pre_post_type == "PRE"):
-               self.condition_controller.register_pre_test_condition(obj)
-            elif (pre_post_type == "POST"):
-                self.condition_controller.register_post_test_condition(obj,
-                                                                       related_name)
+            if pre_post_type == "PRE":
+                self.condition_controller.register_pre_test_condition(obj)
+            elif pre_post_type == "POST":
+                self.condition_controller.register_post_test_condition(\
+                    obj, related_name)
             else:
                 msg = "Unknown condition type [%s]" % pre_post_type
                 LOGGER.warning(msg)
-        self.condition_controller.register_observer(
-                                    self.handle_condition_failure,
-                                    'Failed')
+        self.condition_controller.register_observer(\
+            self.handle_condition_failure, 'Failed')
 
     def create_asterisk(self, count=1, base_configs_path=None):
         """Create n instances of Asterisk
@@ -241,7 +241,7 @@
             # Copy test specific config files
             self.ast[i].install_configs("%s/configs/ast%d" %
                                         (self.test_name, num),
-                                         self.test_config.get_deps())
+                                        self.test_config.get_deps())
 
     def create_ami_factory(self, count=1, username="user", secret="mysecret",
                            port=5038):
@@ -356,9 +356,9 @@
         # Gather up the deferred objects from each of the instances of Asterisk
         # and wait until all are finished before proceeding
         start_defers = []
-        for index, item in enumerate(self.ast):
+        for index, ast in enumerate(self.ast):
             LOGGER.info("Starting Asterisk instance %d" % (index + 1))
-            temp_defer = self.ast[index].start(self.test_config.get_deps())
+            temp_defer = ast.start(self.test_config.get_deps())
             start_defers.append(temp_defer)
 
         deferred = defer.DeferredList(start_defers, consumeErrors=True)
@@ -399,9 +399,9 @@
             # Gather up the stopped defers; check success failure of stopping
             # when all instances of Asterisk have stopped
             stop_defers = []
-            for index, item in enumerate(self.ast):
+            for index, ast in enumerate(self.ast):
                 LOGGER.info("Stopping Asterisk instance %d" % (index + 1))
-                temp_defer = self.ast[index].stop()
+                temp_defer = ast.stop()
                 stop_defers.append(temp_defer)
 
             defer.DeferredList(stop_defers).addCallback(
@@ -425,7 +425,7 @@
             if reactor.running:
                 try:
                     reactor.stop()
-                except twisted.internet.error.ReactorNotRunning:
+                except twisted_error.ReactorNotRunning:
                     # Something stopped it between our checks - at least we're
                     # stopped
                     pass
@@ -454,7 +454,7 @@
         needs to first ensure that Asterisk is fully up and running before
         moving on.
         """
-        if (self.ast):
+        if self.ast:
             self._start_asterisk()
         else:
             # If no instances of Asterisk are needed, go ahead and just run
@@ -468,7 +468,7 @@
         Derived classes must call this implementation, as this method provides a
         fail out mechanism in case the test hangs.
         """
-        if (self.reactor_timeout > 0):
+        if self.reactor_timeout > 0:
             self.timeout_id = reactor.callLater(self.reactor_timeout,
                                                 self._reactor_timeout)
 
@@ -518,7 +518,7 @@
 
         Convenience callback handler for twisted deferred errors for an AMI
         originate call. Derived classes can choose to add this handler to
-        originate calls in order to handle them safely when they fail. 
+        originate calls in order to handle them safely when they fail.
         This will stop the test if called.
 
         Keyword arguments:
@@ -531,7 +531,7 @@
 
     def reset_timeout(self):
         """Resets the reactor timeout"""
-        if (self.timeout_id != None):
+        if self.timeout_id != None:
             original_time = datetime.fromtimestamp(self.timeout_id.getTime())
             self.timeout_id.reset(self.reactor_timeout)
             new_time = datetime.fromtimestamp(self.timeout_id.getTime())
@@ -627,8 +627,8 @@
         fail_token A previously created fail token to be removed from the test
         """
         if not fail_token in self.fail_tokens:
-            LOGGER.warning('Attempted to remove an unknown fail token: %s'
-                % fail_token['message'])
+            LOGGER.warning('Attempted to remove an unknown fail token: %s',
+                           fail_token['message'])
             self.passed = False
             return
         self.fail_tokens.remove(fail_token)
@@ -656,7 +656,7 @@
 
     default_application = 'Echo'
 
-    def __init__(self, test_path = '', test_config = None):
+    def __init__(self, test_path='', test_config=None):
         """Constructor
 
         Parameters:
@@ -691,7 +691,8 @@
             if 'expected_events' in test_config:
                 self.expected_events = test_config['expected_events']
             if 'ignore-originate-failures' in test_config:
-                self._ignore_originate_failures = test_config['ignore-originate-failures']
+                self._ignore_originate_failures =\
+                    test_config['ignore-originate-failures']
             if 'spawn-after-hangup' in test_config:
                 self._spawn_after_hangup = test_config['spawn-after-hangup']
             if 'config-path' in test_config:
@@ -741,26 +742,28 @@
             call_details['otherchannelid'] = None
         if 'application' in call_details:
             msg += " with application %s" % call_details['application']
-            deferred = ami.originate(channel=call_details['channel'],
-                                     application=call_details['application'],
-                                     variable=call_details['variable'],
-                                     account=call_details['account'],
-                                     async=call_details['async'],
-                                     channelid=call_details['channelid'],
-                                     otherchannelid=call_details['otherchannelid'])
+            deferred = ami.originate(\
+                channel=call_details['channel'],
+                application=call_details['application'],
+                variable=call_details['variable'],
+                account=call_details['account'],
+                async=call_details['async'],
+                channelid=call_details['channelid'],
+                otherchannelid=call_details['otherchannelid'])
         else:
             msg += " to %s@%s at %s" % (call_details['exten'],
                                         call_details['context'],
                                         call_details['priority'],)
-            deferred = ami.originate(channel=call_details['channel'],
-                                     context=call_details['context'],
-                                     exten=call_details['exten'],
-                                     priority=call_details['priority'],
-                                     variable=call_details['variable'],
-                                     account=call_details['account'],
-                                     async=call_details['async'],
-                                     channelid=call_details['channelid'],
-                                     otherchannelid=call_details['otherchannelid'])
+            deferred = ami.originate(\
+                channel=call_details['channel'],
+                context=call_details['context'],
+                exten=call_details['exten'],
+                priority=call_details['priority'],
+                variable=call_details['variable'],
+                account=call_details['account'],
+                async=call_details['async'],
+                channelid=call_details['channelid'],
+                otherchannelid=call_details['otherchannelid'])
         if self._ignore_originate_failures:
             deferred.addErrback(__swallow_originate_error)
         else:
@@ -773,7 +776,7 @@
         name that Asterisk created with the call we just originated
         """
 
-        if (event['variable'] == 'testuniqueid'):
+        if event['variable'] == 'testuniqueid':
 
             if (len([chan for chan in self._tracking_channels if
                      chan['testuniqueid'] == event['value']])):
@@ -801,10 +804,10 @@
 
         candidate_channel = [chan for chan in self._tracking_channels
                              if chan['channel'] in event['channel']]
-        if (len(candidate_channel)):
+        if len(candidate_channel):
             LOGGER.debug("Channel %s hung up; removing" % event['channel'])
             self._tracking_channels.remove(candidate_channel[0])
-            if (self._spawn_after_hangup):
+            if self._spawn_after_hangup:
                 self._current_run += 1
                 self.__start_new_call(ami)
 
@@ -814,7 +817,7 @@
         stop the test
         """
 
-        if (self._current_run < len(self._test_runs)):
+        if self._current_run < len(self._test_runs):
             self.__originate_call(ami, self._test_runs[self._current_run])
         else:
             LOGGER.info("All calls executed, stopping")




More information about the svn-commits mailing list