[asterisk-dev] Change in testsuite[master]: PEP8 fixes
Corey Farrell (Code Review)
asteriskteam at digium.com
Fri Apr 10 13:24:21 CDT 2015
Corey Farrell has uploaded a new change for review.
https://gerrit.asterisk.org/40
Change subject: PEP8 fixes
......................................................................
PEP8 fixes
* Add tox.ini for pep8 to set max-line-length=90
* Resolve most PEP8 findings in runtests.py and lib.
Change-Id: I55bcaab21c54f9040594f51c57f0efe30a219a62
---
M lib/python/asterisk/ami.py
M lib/python/asterisk/apptest.py
M lib/python/asterisk/ari.py
M lib/python/asterisk/astcsv.py
M lib/python/asterisk/asterisk.py
M lib/python/asterisk/bridge_test_case.py
M lib/python/asterisk/buildoptions.py
M lib/python/asterisk/cdr.py
M lib/python/asterisk/cel.py
M lib/python/asterisk/confbridge.py
M lib/python/asterisk/config.py
M lib/python/asterisk/dns_server.py
M lib/python/asterisk/fd_test_condition.py
M lib/python/asterisk/lock_test_condition.py
M lib/python/asterisk/pcap.py
M lib/python/asterisk/phones.py
M lib/python/asterisk/pjsua_mod.py
M lib/python/asterisk/pluggable_modules.py
M lib/python/asterisk/pluggable_registry.py
M lib/python/asterisk/sip_channel_test_condition.py
M lib/python/asterisk/sip_dialog_test_condition.py
M lib/python/asterisk/sipp.py
M lib/python/asterisk/sippversion.py
M lib/python/asterisk/syncami.py
M lib/python/asterisk/test_case.py
M lib/python/asterisk/test_conditions.py
M lib/python/asterisk/test_config.py
M lib/python/asterisk/test_runner.py
M lib/python/asterisk/test_state.py
M lib/python/asterisk/test_suite_utils.py
M lib/python/asterisk/thread_test_condition.py
M lib/python/asterisk/version.py
M lib/python/asterisk/voicemail.py
M lib/python/pcap_listener.py
M lib/python/qm.py
M lib/python/rlmi.py
M lib/python/sip_message.py
M runtests.py
A tox.ini
39 files changed, 596 insertions(+), 516 deletions(-)
git pull ssh://gerrit.asterisk.org:29418/testsuite refs/changes/40/40/1
diff --git a/lib/python/asterisk/ami.py b/lib/python/asterisk/ami.py
index 1dd5221..3cbeeaf 100644
--- a/lib/python/asterisk/ami.py
+++ b/lib/python/asterisk/ami.py
@@ -14,9 +14,10 @@
import re
import json
from pluggable_registry import PLUGGABLE_EVENT_REGISTRY,\
- PLUGGABLE_ACTION_REGISTRY, var_replace
+ PLUGGABLE_ACTION_REGISTRY, var_replace
LOGGER = logging.getLogger(__name__)
+
class AMIEventInstance(object):
"""Base class for specific instances of AMI event observers
@@ -37,10 +38,8 @@
conditions = instance_config['conditions']
self.match_conditions = conditions['match']
self.nonmatch_conditions = conditions.get('nomatch', {})
- self.ids = instance_config['id'].split(',') if 'id' in instance_config\
- else ['0']
- self.action = instance_config['action'] if 'action' in instance_config\
- else 'none'
+ self.ids = instance_config['id'].split(',') if 'id' in instance_config else ['0']
+ self.action = instance_config['action'] if 'action' in instance_config else 'none'
self.config = instance_config
self.passed = True
self._registered = False
@@ -75,7 +74,7 @@
# test's yaml then create the dict with setting the Event to 'CEL'.
# Otherwise set Event to 'CEL' since it's the only Event we want.
if instance_config['conditions']['match'] is None:
- instance_config['conditions']['match'] = {'Event' : 'CEL'}
+ instance_config['conditions']['match'] = {'Event': 'CEL'}
self.match_conditions = instance_config['conditions']['match']
else:
instance_config['conditions']['match']['Event'] = 'CEL'
@@ -118,7 +117,7 @@
def dispose(self, ami):
"""Dispose of this object's AMI event registrations"""
if str(ami.id) not in self.ids:
- LOGGER.warning("Unable to dispose of AMIEventInstance - " \
+ LOGGER.warning("Unable to dispose of AMIEventInstance - "
"unknown AMI object %d", ami.id)
return
ami.deregisterEvent(self.match_conditions['Event'],
@@ -209,9 +208,9 @@
LOGGER.debug("Initializing an AMIHeaderMatchInstance")
if 'requirements' in instance_config:
self.match_requirements =\
- instance_config['requirements'].get('match', {})
+ instance_config['requirements'].get('match', {})
self.nonmatch_requirements =\
- instance_config['requirements'].get('nomatch', {})
+ instance_config['requirements'].get('nomatch', {})
else:
self.match_requirements = {}
self.nonmatch_requirements = {}
@@ -224,7 +223,7 @@
key, event['event'])
self.passed = False
elif not re.match(value, event.get(key.lower())):
- LOGGER.warning("Requirement %s: %s does not match %s: %s in " \
+ LOGGER.warning("Requirement %s: %s does not match %s: %s in "
"event", key, value, key,
event.get(key.lower(), ''))
self.passed = False
@@ -242,7 +241,7 @@
key, value, key, event.get(key.lower(), ''))
self.passed = False
else:
- LOGGER.debug("Requirement %s: %s does not match %s: %s " \
+ LOGGER.debug("Requirement %s: %s does not match %s: %s "
"in event", key, value, key,
event.get(key.lower(), ''))
@@ -292,7 +291,7 @@
key, event['event'])
self.passed = False
elif not re.match(value, event.get(key.lower())):
- LOGGER.warning("Requirement %s: %s does not match %s: " \
+ LOGGER.warning("Requirement %s: %s does not match %s: "
"%s in event", key, value, key,
event.get(key.lower()))
self.passed = False
@@ -340,8 +339,8 @@
for key, value in requirements['match'].items():
lower_key = key.lower()
if lower_key == 'extra':
- value = dict((key.lower(), value)\
- for key, value in value.iteritems())
+ value = dict((key.lower(), value)
+ for key, value in value.iteritems())
self.requirements[lower_key] = value
self.orderings = requirements.get('partialorder') or []
self.named_id = requirements.get('id')
@@ -364,8 +363,8 @@
if extra_item is None:
continue
extra_match = re.match(extra_item, str(extra_value))
- if extra_match is None or\
- extra_match.end() != len(str(extra_value)):
+ if extra_match is None \
+ or extra_match.end() != len(str(extra_value)):
LOGGER.debug('Skipping %s - %s does not equal %s for '
'extra-subfield %s', event['eventname'],
extra_item, str(extra_value), extra_key)
@@ -419,8 +418,8 @@
# Add of all our named events to the lists of events that haven't
# occurred yet
- named_events = [ev for ev in self.match_requirements if\
- ev.named_id is not None]
+ named_events = [ev for ev in self.match_requirements
+ if ev.named_id is not None]
AMICelInstance.unmatched_cel_events.extend(named_events)
AMICelInstance.ami_cel_instances.append(self)
@@ -470,15 +469,15 @@
for order_type, named_event in cel_requirement.orderings.items():
order_type = order_type.lower()
if order_type == 'after':
- matches = [ev for ev in AMICelInstance.matched_cel_events if\
- ev.named_id == named_event]
+ matches = [ev for ev in AMICelInstance.matched_cel_events
+ if ev.named_id == named_event]
if len(matches) == 0:
LOGGER.warning('Event %s did not occur after %s; failing',
str(cel_requirement), named_event)
self.test_object.set_passed(False)
elif order_type == 'before':
- matches = [ev for ev in AMICelInstance.unmatched_cel_events if\
- ev.named_id == named_event]
+ matches = [ev for ev in AMICelInstance.unmatched_cel_events
+ if ev.named_id == named_event]
if len(matches) == 0:
LOGGER.warning('Event %s did not occur before %s; failing',
str(cel_requirement), named_event)
@@ -512,7 +511,7 @@
callback_module = __import__(self.callback_module)
method = getattr(callback_module, self.callback_method)
self.passed = method(ami, event)
- if self.passed == None:
+ if self.passed is None:
LOGGER.error("Callback %s.%s returned None instead of a boolean",
self.callback_module, self.callback_method)
self.passed = False
@@ -521,6 +520,7 @@
"""Deferred callback called when this object should verify pass/fail"""
self.test_object.set_passed(self.passed)
return callback_param
+
class AMIEventInstanceFactory:
"""Factory object that builds concrete instances of various AMIEventModules.
@@ -558,6 +558,7 @@
instance_type)
raise Exception
+
class AMIEventModule(object):
"""Pluggable module for AMI event matching
@@ -580,6 +581,7 @@
event_instance = AMIEventInstanceFactory\
.create_instance(instance, test_object)
self.ami_instances.append(event_instance)
+
class AMI(object):
"""Class that manages a connection to Asterisk over AMI"""
@@ -649,6 +651,7 @@
reactor.callLater(delay, self.login)
return reason
+
class AMIStartEventModule(object):
"""An event module that triggers when the test starts."""
@@ -664,6 +667,7 @@
self.triggered_callback(self, ami)
PLUGGABLE_EVENT_REGISTRY.register("ami-start", AMIStartEventModule)
+
class AMIPluggableEventInstance(AMIHeaderMatchInstance):
"""Subclass of AMIEventInstance that works with the pluggable event action
module.
@@ -677,13 +681,13 @@
def __init__(self, test_object, triggered_callback, config, data):
"""Setup the AMI event observer"""
self.triggered_callback = triggered_callback
- self.data = data
+ self.data = data
self.trigger_on_count = config.get('trigger-on-count', False)
super(AMIPluggableEventInstance, self).__init__(config, test_object)
def event_callback(self, ami, event):
"""Callback called when an event is received from AMI"""
- super(AMIPluggableEventInstance, self).event_callback(ami, event)
+ super(AMIPluggableEventInstance, self).event_callback(ami, event)
if self.passed and (not self.trigger_on_count or
self.count['event'] == self.count['min']):
self.triggered_callback(self.data, ami, event)
@@ -705,6 +709,7 @@
self))
PLUGGABLE_EVENT_REGISTRY.register("ami-events", AMIPluggableEventModule)
+
def replace_ami_vars(mydict, values):
outdict = {}
for key, value in mydict.iteritems():
@@ -712,6 +717,7 @@
return outdict
+
class AMIPluggableActionModule(object):
"""Pluggable AMI action module.
"""
diff --git a/lib/python/asterisk/apptest.py b/lib/python/asterisk/apptest.py
index d2d35d8..bb4d861 100644
--- a/lib/python/asterisk/apptest.py
+++ b/lib/python/asterisk/apptest.py
@@ -23,6 +23,7 @@
LOGGER = logging.getLogger(__name__)
+
class AppTest(TestCase):
"""A pluggable test object suitable for orchestrating tests against long
running Asterisk applications"""
@@ -59,7 +60,7 @@
self.raw_test_config = test_config
if 'app' in self.raw_test_config:
- self._applications = [ self.raw_test_config['app'] ]
+ self._applications = [self.raw_test_config['app']]
elif 'apps' in self.raw_test_config:
self._applications = self.raw_test_config['apps']
else:
@@ -222,18 +223,18 @@
self._channel_id = channel_def['channel-id']
self._channel_name = channel_def['channel-name']
self._applications = applications
- self._controller_context = channel_def.get('context') or \
- ChannelObject.default_context
- self._controller_initial_exten = channel_def.get('exten') or \
- ChannelObject.default_wait_exten
- self._controller_hangup_exten = channel_def.get('hangup-exten') or \
- ChannelObject.default_hangup_exten
- self._controller_audio_exten = channel_def.get('audio-exten') or \
- ChannelObject.default_audio_exten
- self._controller_dtmf_exten = channel_def.get('dtmf-exten') or \
- ChannelObject.default_dtmf_exten
- self._controller_wait_exten = channel_def.get('wait-exten') or \
- ChannelObject.default_wait_exten
+ self._controller_context = channel_def.get('context') \
+ or ChannelObject.default_context
+ self._controller_initial_exten = channel_def.get('exten') \
+ or ChannelObject.default_wait_exten
+ self._controller_hangup_exten = channel_def.get('hangup-exten') \
+ or ChannelObject.default_hangup_exten
+ self._controller_audio_exten = channel_def.get('audio-exten') \
+ or ChannelObject.default_audio_exten
+ self._controller_dtmf_exten = channel_def.get('dtmf-exten') \
+ or ChannelObject.default_dtmf_exten
+ self._controller_wait_exten = channel_def.get('wait-exten') \
+ or ChannelObject.default_wait_exten
delay = channel_def.get('delay') or 0
self.ami = ami
@@ -270,10 +271,10 @@
def __spawn_call_callback(spawn_call_deferred):
"""Actually perform the origination"""
self.ami.originate(channel=self._channel_name,
- context=self._controller_context,
- exten=self._controller_initial_exten,
- priority='1',
- variable={'testuniqueid': '%s' % self._unique_id})
+ context=self._controller_context,
+ exten=self._controller_initial_exten,
+ priority='1',
+ variable={'testuniqueid': '%s' % self._unique_id})
spawn_call_deferred.callback(self)
spawn_call_deferred = defer.Deferred()
@@ -283,7 +284,7 @@
def __str__(self):
return '(Controller: %s; Application %s)' % (self.controller_channel,
- self.app_channel)
+ self.app_channel)
def _handle_redirect_failure(self, reason):
"""If a redirect fails, complain loudly"""
@@ -362,8 +363,8 @@
dtmf, dtmf_deferred = param
if (self._previous_dtmf != dtmf):
deferred = self.ami.setVar(channel=self.controller_channel,
- variable='DTMF_TO_SEND',
- value=dtmf)
+ variable='DTMF_TO_SEND',
+ value=dtmf)
deferred.addCallback(__send_dtmf_redirect, dtmf_deferred)
self._previous_dtmf = dtmf
else:
@@ -401,8 +402,8 @@
sound_file, audio_deferred = param
if (self._previous_sound_file != sound_file):
deferred = self.ami.setVar(channel=self.controller_channel,
- variable="TALK_AUDIO",
- value=sound_file)
+ variable="TALK_AUDIO",
+ value=sound_file)
deferred.addCallback(__stream_audio_redirect, audio_deferred)
self._previous_sound_file = sound_file
else:
@@ -517,8 +518,8 @@
"""Handler for test events"""
if 'channel' not in event:
return
- if self.app_channel not in event['channel'] and \
- self.controller_channel not in event['channel']:
+ if self.app_channel not in event['channel'] \
+ and self.controller_channel not in event['channel']:
return
for observer in self._test_observers:
observer(self, event)
@@ -603,7 +604,7 @@
# sure that this is actually for us by checking the Asterisk channel
# names
if (self.channel_obj.app_channel in event['channel']
- or self.channel_obj.controller_channel in event['channel']):
+ or self.channel_obj.controller_channel in event['channel']):
self.execute_next_action(actions=self.actions)
def execute_next_action(self, result=None, actions=None):
@@ -714,10 +715,11 @@
else int(action_config['sound-delay'])
def __call__(self, channel_object):
- return channel_object.stream_audio_with_dtmf(sound_file=self.sound_file,
- dtmf=self.dtmf,
- sound_delay=self.sound_delay,
- dtmf_delay=self.dtmf_delay)
+ return channel_object.stream_audio_with_dtmf(
+ sound_file=self.sound_file,
+ dtmf=self.dtmf,
+ sound_delay=self.sound_delay,
+ dtmf_delay=self.dtmf_delay)
class ActionSetExpectedResult(object):
@@ -803,9 +805,9 @@
self.add_app_channel = action_config.get('add-app-channel') or False
self.add_control_channel = action_config.get('add-control-channel') or False
self.channel_id = action_config.get('channel-id') or None
- if ((self.add_app_channel and self.add_control_channel) or
- (self.add_app_channel and self.channel_id) or
- (self.add_control_channel and self.channel_id)):
+ if ((self.add_app_channel and self.add_control_channel)
+ or (self.add_app_channel and self.channel_id)
+ or (self.add_control_channel and self.channel_id)):
raise Exception('Only one channel can be added to the message!')
self.message_fields = action_config['fields']
@@ -816,7 +818,8 @@
self.message_fields['Channel'] = channel_object.controller_channel
elif self.channel_id:
test_object = AppTest.get_instance()
- self.message_fields['Channel'] = test_object.get_channel_object(self.channel_id).app_channel
+ self.message_fields['Channel'] = test_object\
+ .get_channel_object(self.channel_id).app_channel
LOGGER.debug('Sending message: %s' % str(self.message_fields))
channel_object.ami.sendMessage(self.message_fields)
return None
@@ -834,7 +837,7 @@
'hangup': ActionHangup,
'fail-test': ActionFailTest,
'end-scenario': ActionEndScenario,
- 'send-ami-message': ActionSendMessage,}
+ 'send-ami-message': ActionSendMessage, }
@staticmethod
def create_action(action_def):
diff --git a/lib/python/asterisk/ari.py b/lib/python/asterisk/ari.py
index 6288f45..50c29fe 100644
--- a/lib/python/asterisk/ari.py
+++ b/lib/python/asterisk/ari.py
@@ -16,7 +16,7 @@
from test_case import TestCase
from pluggable_registry import PLUGGABLE_EVENT_REGISTRY,\
- PLUGGABLE_ACTION_REGISTRY, var_replace
+ PLUGGABLE_ACTION_REGISTRY, var_replace
from test_suite_utils import all_match
from twisted.internet import reactor
try:
@@ -134,9 +134,10 @@
self.create_ami_factory(count=self.asterisk_instances)
def stop_reactor(self):
- if self._ws_connection != None:
+ if self._ws_connection is not None:
self._ws_connection.dropConnection()
super(AriBaseTestObject, self).stop_reactor()
+
class AriTestObject(AriBaseTestObject):
"""Class that acts as a Test Object in the pluggable module framework"""
@@ -161,7 +162,6 @@
if self.iterations is None:
self.iterations = [{'channel': 'Local/s at default',
'application': 'Echo'}]
-
def ami_connect(self, ami):
"""Override of AriBaseTestObject ami_connect
@@ -527,7 +527,8 @@
uri = var_replace(self.uri, values)
url = self.ari.build_url(uri)
requests_method = getattr(requests, self.method)
- params = dict((key, var_replace(val, values)) for key, val in self.params.iteritems())
+ params = dict((key, var_replace(val, values))
+ for key, val in self.params.iteritems())
response = requests_method(
url,
@@ -646,6 +647,7 @@
res = not all_match(nomatch, message)
return res
+
class Range(object):
"""Utility object to handle numeric ranges (inclusive)."""
@@ -685,6 +687,7 @@
else:
# Need exactly this many events
return Range(int(yaml), int(yaml))
+
class ARIPluggableEventModule(object):
"""Subclass of ARIEventInstance that works with the pluggable event action
@@ -726,8 +729,7 @@
:param args: Ignored arguments.
"""
for event_desc in self.config:
- if not event_desc["expected_count_range"]\
- .contains(event_desc["event_count"]):
+ if not event_desc["expected_count_range"].contains(event_desc["event_count"]):
# max could be int or float('inf'); format with %r
LOGGER.error("Expected %d <= count <= %r; was %d (%r, !%r)",
event_desc["expected_count_range"].min_value,
@@ -739,6 +741,7 @@
self.test_object.set_passed(True)
PLUGGABLE_EVENT_REGISTRY.register("ari-events", ARIPluggableEventModule)
+
class ARIPluggableRequestModule(object):
"""Pluggable ARI action module.
"""
diff --git a/lib/python/asterisk/astcsv.py b/lib/python/asterisk/astcsv.py
index 8b23a70..67d25d3 100644
--- a/lib/python/asterisk/astcsv.py
+++ b/lib/python/asterisk/astcsv.py
@@ -19,6 +19,7 @@
LOGGER = logging.getLogger(__name__)
+
class AsteriskCSVLine(object):
"A single Asterisk call detail record"
@@ -59,7 +60,7 @@
for key, value in self.iteritems():
if None not in (value, other.get(key)) and not cmp_fn(value, other.get(key)):
if not silent:
- LOGGER.warn("CSV MATCH FAILED, Expected: %s: '%s' " \
+ LOGGER.warn("CSV MATCH FAILED, Expected: %s: '%s' "
"Got: %s: '%s'" % (key, value, key,
other.get(key)))
return False
@@ -124,7 +125,7 @@
each record"""
if not partial and (len(self) != len(other)):
- LOGGER.warn("CSV MATCH FAILED, different number of records, " \
+ LOGGER.warn("CSV MATCH FAILED, different number of records, "
"self=%d and other=%d" % (len(self), len(other)))
return False
@@ -189,7 +190,7 @@
assert False
elif len(matches) == 1:
- pass # joy!
+ pass # joy!
elif len(matches) > 1:
LOGGER.warn("More than one CSV permutation results in success")
diff --git a/lib/python/asterisk/asterisk.py b/lib/python/asterisk/asterisk.py
index 53342ce..097d227 100755
--- a/lib/python/asterisk/asterisk.py
+++ b/lib/python/asterisk/asterisk.py
@@ -26,6 +26,7 @@
LOGGER = logging.getLogger(__name__)
+
class AsteriskCliCommand(object):
"""Class that manages an Asterisk CLI command."""
@@ -419,8 +420,7 @@
cli_deferred = self.cli_exec("stop gracefully")
else:
cli_deferred = self.cli_exec("core stop gracefully")
- cli_deferred.addCallbacks(__stop_gracefully_callback,
- __stop_gracefully_error)
+ cli_deferred.addCallbacks(__stop_gracefully_callback, __stop_gracefully_error)
def __stop_gracefully_callback(cli_command):
"""Callback handler for the core stop gracefully CLI command"""
@@ -453,11 +453,9 @@
pass
try:
if not self._stop_deferred.called:
- self._stop_deferred.callback("Asterisk %s KILLED" %
- self.host)
+ self._stop_deferred.callback("Asterisk %s KILLED" % self.host)
except defer.AlreadyCalledError:
- LOGGER.warning("Asterisk %s stop deferred already called" %
- self.host)
+ LOGGER.warning("Asterisk %s stop deferred already called" % self.host)
def __process_stopped(reason):
"""Generic callback that raises the stopped deferred subscribers
@@ -671,7 +669,8 @@
raise_error = True
LOGGER.error('Channel dial string must be in the form "tech/data".')
if raise_error is True:
- raise Exception("Cannot originate call!\n"
+ raise Exception(
+ "Cannot originate call!\n"
"Argument string must be in one of these forms:\n"
"<tech/data> application <appname> appdata\n"
"<tech/data> extension <exten>@<context>")
@@ -775,7 +774,7 @@
ast_file.write("%s = %s%s\n" % (var, self.base, val))
elif cat.name == "options":
ast_file.write("#include \"%s/asterisk.options.conf.inc\"\n" %
- (self.astetcdir))
+ (self.astetcdir))
if ast_conf_options:
for (var, val) in ast_conf_options.iteritems():
ast_file.write("%s = %s\n" % (var, val))
diff --git a/lib/python/asterisk/bridge_test_case.py b/lib/python/asterisk/bridge_test_case.py
index f425955..e9ae244 100644
--- a/lib/python/asterisk/bridge_test_case.py
+++ b/lib/python/asterisk/bridge_test_case.py
@@ -17,6 +17,7 @@
LOGGER = logging.getLogger(__name__)
+
class BridgeTestCase(TestCase):
"""A test object for bridging tests"""
@@ -24,17 +25,17 @@
BOB_CONNECTED = '"Alice" <1234>'
FEATURE_MAP = {
- 'blindxfer' : 1,
- 'atxfer' : 2,
- 'disconnect' : 3,
- 'automon' : 4,
- 'automixmon' : 5,
- 'parkcall' : 6,
- 'atxferthreeway' : 7,
- # other arbitrary DTMF features can be matched under DTMF code 8
- # as "unknown"
- 'unknown' : 8
- }
+ 'blindxfer': 1,
+ 'atxfer': 2,
+ 'disconnect': 3,
+ 'automon': 4,
+ 'automixmon': 5,
+ 'parkcall': 6,
+ 'atxferthreeway': 7,
+ # other arbitrary DTMF features can be matched under DTMF code 8
+ # as "unknown"
+ 'unknown': 8
+ }
def __init__(self, test_path='', test_config=None):
"""Class that handles tests involving two-party bridges.
@@ -193,13 +194,12 @@
# Step 1: Initiate a call from Alice to Bob
LOGGER.info("Originating call")
- self.ami_alice.originate(channel=test_run['originate_channel'],
- exten='test_call',
- context='default',
- priority='1',
- variable={'TALK_AUDIO': ('%s' %
- os.path.join(os.getcwd(),
- 'lib/python/asterisk/audio'))})
+ self.ami_alice.originate(
+ channel=test_run['originate_channel'],
+ exten='test_call', context='default', priority='1',
+ variable={
+ 'TALK_AUDIO': os.path.join(os.getcwd(), 'lib/python/asterisk/audio')
+ })
def user_callback(self, ami, event):
"""UserEvent AMI event callback"""
@@ -378,18 +378,22 @@
self.execute_features()
if alice_connected_line is not None:
- self.ami_uut.getVar(self.uut_alice_channel,
+ self.ami_uut.getVar(
+ self.uut_alice_channel,
'CONNECTEDLINE(all)').addCallback(alice_connected,
alice_connected_line)
if bob_connected_line is not None:
- self.ami_uut.getVar(self.uut_bob_channel,
+ self.ami_uut.getVar(
+ self.uut_bob_channel,
'CONNECTEDLINE(all)').addCallback(bob_connected,
bob_connected_line)
if alice_bridge_peer is not None:
- self.ami_uut.getVar(self.uut_alice_channel,
+ self.ami_uut.getVar(
+ self.uut_alice_channel,
'BRIDGEPEER').addCallback(alice_bridgepeer, alice_bridge_peer)
if bob_bridge_peer is not None:
- self.ami_uut.getVar(self.uut_bob_channel,
+ self.ami_uut.getVar(
+ self.uut_bob_channel,
'BRIDGEPEER').addCallback(bob_bridgepeer, bob_bridge_peer)
def execute_features(self):
@@ -408,9 +412,9 @@
def execute_feature(self, feature):
"""Execute the specified feature"""
- if (not 'who' in feature or
- not 'what' in feature or
- not 'success' in feature):
+ if (not 'who' in feature
+ or not 'what' in feature
+ or not 'success' in feature):
LOGGER.warning("Missing necessary feature information")
self.set_passed(False)
if feature['who'] == 'alice':
@@ -435,16 +439,15 @@
for observer in self.feature_start_observers:
observer(self, self.features[self.current_feature])
- LOGGER.info("Sending feature %s from %s" % (feature['what'],
- feature['who']))
+ LOGGER.info("Sending feature %s from %s" % (feature['what'], feature['who']))
# make sure to put a gap between DTMF digits to ensure that events
# headed to the UUT are not ignored because they occur too quickly
sleep(0.25)
ami.playDTMF(channel, BridgeTestCase.FEATURE_MAP[feature['what']])
sleep(0.25)
- if ((feature['what'] == 'blindxfer' or feature['what'] == 'atxfer') and
- 'exten' in feature):
+ if ((feature['what'] == 'blindxfer' or feature['what'] == 'atxfer')
+ and 'exten' in feature):
# playback the extension requested
for digit in list(feature['exten']):
sleep(0.25)
diff --git a/lib/python/asterisk/buildoptions.py b/lib/python/asterisk/buildoptions.py
index 788ef28..9c2c46a 100644
--- a/lib/python/asterisk/buildoptions.py
+++ b/lib/python/asterisk/buildoptions.py
@@ -15,6 +15,7 @@
import sys
import unittest
+
class AsteriskBuildOptions(object):
"""Tracks the build options for an instance of Asterisk"""
@@ -37,7 +38,6 @@
if (self.__parse_buildopts_file(hdr_path)):
return
raise Exception("Failed to open any build options files")
-
def __parse_buildopts_file(self, path):
"""Extract and parse the build options"""
@@ -104,5 +104,6 @@
unittest.main()
+
if __name__ == "__main__":
main()
diff --git a/lib/python/asterisk/cdr.py b/lib/python/asterisk/cdr.py
index 443e034..6380ed0 100644
--- a/lib/python/asterisk/cdr.py
+++ b/lib/python/asterisk/cdr.py
@@ -17,9 +17,9 @@
LOGGER = logging.getLogger(__name__)
+
class CDRModule(object):
"""A module that checks a test for expected CDR results"""
-
def __init__(self, module_config, test_object):
"""Constructor
@@ -67,7 +67,6 @@
sys.exc_info()[0])
return callback_param
-
def match_cdrs(self):
"""Called when all instances of Asterisk have exited. Derived
classes can override this to provide their own behavior for CDR
@@ -79,10 +78,8 @@
for file_name in self.cdr_records[ast_id]:
records = self.cdr_records[ast_id][file_name]
cdr_expect = AsteriskCSVCDR(records=records)
- cdr_file = AsteriskCSVCDR(filename="%s/%s/cdr-csv/%s.csv" %
- (ast_instance.base,
- ast_instance.directories['astlogdir'],
- file_name))
+ cdr_file = AsteriskCSVCDR(filename=ast_instance.get_path(
+ "astlogdir", "cdr-csv", "%s.csv" % file_name))
if cdr_expect.match(cdr_file):
LOGGER.debug("%s.csv: CDR results met expectations" %
file_name)
@@ -97,11 +94,13 @@
class AsteriskCSVCDRLine(astcsv.AsteriskCSVLine):
"""A single Asterisk call detail record"""
- fields = ['accountcode', 'source', 'destination', 'dcontext', 'callerid',
- 'channel', 'dchannel', 'lastapp', 'lastarg', 'start', 'answer', 'end',
- 'duration', 'billsec', 'disposition', 'amaflags', 'uniqueid', 'userfield']
+ fields = [
+ 'accountcode', 'source', 'destination', 'dcontext', 'callerid',
+ 'channel', 'dchannel', 'lastapp', 'lastarg', 'start', 'answer', 'end',
+ 'duration', 'billsec', 'disposition', 'amaflags', 'uniqueid', 'userfield']
- def __init__(self, accountcode=None, source=None, destination=None,
+ def __init__(
+ self, accountcode=None, source=None, destination=None,
dcontext=None, callerid=None, channel=None, dchannel=None,
lastapp=None, lastarg=None, start=None, answer=None, end=None,
duration=None, billsec=None, disposition=None, amaflags=None,
@@ -114,8 +113,8 @@
**dict.
"""
- astcsv.AsteriskCSVLine.__init__(self,
- AsteriskCSVCDRLine.fields, accountcode=accountcode,
+ astcsv.AsteriskCSVLine.__init__(
+ self, AsteriskCSVCDRLine.fields, accountcode=accountcode,
source=source, destination=destination,
dcontext=dcontext, callerid=callerid, channel=channel,
dchannel=dchannel, lastapp=lastapp, lastarg=lastarg, start=start,
@@ -130,7 +129,8 @@
def __init__(self, filename=None, records=None):
"""Initialize CDR records from an Asterisk cdr-csv file"""
- astcsv.AsteriskCSV.__init__(self, filename, records,
+ astcsv.AsteriskCSV.__init__(
+ self, filename, records,
AsteriskCSVCDRLine.fields, AsteriskCSVCDRLine)
@@ -142,11 +142,12 @@
cdr = AsteriskCSVCDR("self_test/Master.csv")
self.assertEqual(len(cdr), 2)
- self.assertTrue(AsteriskCSVCDRLine(duration=7,
- lastapp="hangup").match(cdr[0],
- exact=(True, True)))
- self.assertTrue(cdr[0].match(AsteriskCSVCDRLine(duration=7,
- lastapp="hangup"),
+ self.assertTrue(
+ AsteriskCSVCDRLine(duration=7, lastapp="hangup").match(
+ cdr[0],
+ exact=(True, True)))
+ self.assertTrue(cdr[0].match(
+ AsteriskCSVCDRLine(duration=7, lastapp="hangup"),
exact=(True, True)))
self.assertFalse(cdr[1].match(cdr[0]))
diff --git a/lib/python/asterisk/cel.py b/lib/python/asterisk/cel.py
index 2b7e7a3..d22d67f 100644
--- a/lib/python/asterisk/cel.py
+++ b/lib/python/asterisk/cel.py
@@ -18,6 +18,7 @@
LOGGER = logging.getLogger(__name__)
+
class CELSniffer(object):
"""A pluggable module that sniffs AMI CEL events and dumps them into a YAML
file. Useful during test writing to create a baseline of expected CEL events
@@ -110,7 +111,6 @@
self.match_cels()
return callback_param
-
def match_cels(self):
"""Called when all instances of Asterisk have exited. Derived
classes can override this to provide their own behavior for CEL
@@ -119,10 +119,8 @@
expectations_met = True
for key in self.cel_records:
cel_expect = AsteriskCSVCEL(records=self.cel_records[key])
- cel_file = AsteriskCSVCEL(filename="%s/%s/cel-custom/%s.csv" %
- (self.test_object.ast[0].base,
- self.test_object.ast[0].directories['astlogdir'],
- key))
+ cel_file = AsteriskCSVCEL(filename=self.test_object.ast[0].get_path(
+ "astlogdir", "cel-custom", "%s.csv" % key))
if cel_expect.match(cel_file):
LOGGER.debug("%s.csv - CEL results met expectations" % key)
else:
@@ -137,17 +135,19 @@
class AsteriskCSVCELLine(astcsv.AsteriskCSVLine):
"A single Asterisk Call Event Log record"
- fields = ['eventtype', 'eventtime', 'cidname', 'cidnum', 'ani', 'rdnis',
- 'dnid', 'exten', 'context', 'channel', 'app', 'appdata', 'amaflags',
- 'accountcode', 'uniqueid', 'linkedid', 'bridgepeer', 'userfield',
- 'userdeftype', 'eventextra']
+ fields = [
+ 'eventtype', 'eventtime', 'cidname', 'cidnum', 'ani', 'rdnis',
+ 'dnid', 'exten', 'context', 'channel', 'app', 'appdata', 'amaflags',
+ 'accountcode', 'uniqueid', 'linkedid', 'bridgepeer', 'userfield',
+ 'userdeftype', 'eventextra']
- def __init__(self, eventtype=None, eventtime=None, cidname=None,
- cidnum=None, ani=None, rdnis=None, dnid=None, exten=None,
- context=None, channel=None, app=None, appdata=None,
- amaflags=None, accountcode=None, uniqueid=None, linkedid=None,
- bridgepeer=None, userfield=None, userdeftype=None,
- eventextra=None):
+ def __init__(
+ self, eventtype=None, eventtime=None, cidname=None,
+ cidnum=None, ani=None, rdnis=None, dnid=None, exten=None,
+ context=None, channel=None, app=None, appdata=None,
+ amaflags=None, accountcode=None, uniqueid=None, linkedid=None,
+ bridgepeer=None, userfield=None, userdeftype=None,
+ eventextra=None):
"""Construct an Asterisk CSV CEL.
The arguments list definition must be in the same order that the
@@ -156,7 +156,8 @@
**dict.
"""
- astcsv.AsteriskCSVLine.__init__(self, AsteriskCSVCELLine.fields,
+ astcsv.AsteriskCSVLine.__init__(
+ self, AsteriskCSVCELLine.fields,
eventtype=eventtype, eventtime=eventtime, cidname=cidname,
cidnum=cidnum, ani=ani, rdnis=rdnis, dnid=dnid, exten=exten,
context=context, channel=channel, app=app, appdata=appdata,
@@ -164,13 +165,15 @@
linkedid=linkedid, bridgepeer=bridgepeer, userfield=userfield,
userdeftype=userdeftype, eventextra=eventextra)
+
class AsteriskCSVCEL(astcsv.AsteriskCSV):
"""A representation of an Asterisk CSV CEL file"""
def __init__(self, filename=None, records=None):
"""Initialize CEL records from an Asterisk cel-csv file"""
- astcsv.AsteriskCSV.__init__(self, filename, records,
+ astcsv.AsteriskCSV.__init__(
+ self, filename, records,
AsteriskCSVCELLine.fields, AsteriskCSVCELLine)
@@ -182,13 +185,16 @@
cel = AsteriskCSVCEL("self_test/CELMaster1.csv")
self.assertEqual(len(cel), 16)
- self.assertTrue(AsteriskCSVCELLine(eventtype="LINKEDID_END",
- channel="TinCan/string").match(cel[-1],
- silent=True, exact=(True, True)))
- self.assertTrue(cel[-1].match(AsteriskCSVCELLine(
- eventtype="LINKEDID_END",
- channel="TinCan/string"),
- silent=True, exact=(True, True)))
+ self.assertTrue(AsteriskCSVCELLine(
+ eventtype="LINKEDID_END",
+ channel="TinCan/string").match(cel[-1],
+ silent=True,
+ exact=(True, True)))
+ self.assertTrue(cel[-1].match(
+ AsteriskCSVCELLine(eventtype="LINKEDID_END",
+ channel="TinCan/string"),
+ silent=True,
+ exact=(True, True)))
self.assertFalse(cel[1].match(cel[0], silent=True))
self.assertFalse(cel[0].match(cel[1], silent=True))
diff --git a/lib/python/asterisk/confbridge.py b/lib/python/asterisk/confbridge.py
index 8f681c1..acca1e7 100644
--- a/lib/python/asterisk/confbridge.py
+++ b/lib/python/asterisk/confbridge.py
@@ -21,6 +21,7 @@
LOGGER = logging.getLogger(__name__)
+
class ConfbridgeChannelObject(object):
"""A tracking object that ties together information about the channels
involved in a ConfBridge
@@ -44,6 +45,7 @@
self.caller_channel = caller_channel
self.caller_ami = caller_ami
self.profile = profile_option
+
class ConfbridgeTestState(TestState):
"""Base class test state for ConfBridge. Allows states to send DTMF tones,
@@ -181,7 +183,7 @@
call_id The channel name, from the perspective of the ConfBridge app
audio_file The local path to the file to stream
dtmf The DTMF signal to send
-
+
Note that this is necessary so that when the audio file is finished, we
close the audio recording cleanly; otherwise, Asterisk may detect the
end of file as a hangup
diff --git a/lib/python/asterisk/config.py b/lib/python/asterisk/config.py
index b3e0037..e26b156 100755
--- a/lib/python/asterisk/config.py
+++ b/lib/python/asterisk/config.py
@@ -23,6 +23,7 @@
LOGGER = logging.getLogger(__name__)
+
def is_blank_line(line):
"""Is this a blank line?"""
return re.match("\s*(?:;.*)?$", line) is not None
@@ -103,8 +104,8 @@
match = self.category_re.match(line)
if match is not None:
self.categories.append(
- Category(match.group("name"),
- template=match.group("template") == "!")
+ Category(match.group("name"),
+ template=match.group("template") == "!")
)
elif len(self.categories) == 0:
if not is_blank_line(line):
@@ -165,7 +166,7 @@
self.assertEqual(conf.categories[1].options[1][1], "x|y|z")
self.assertEqual(conf.categories[1].options[2][0], "1234")
self.assertEqual(conf.categories[1].options[2][1],
- "4242,Example Mailbox,root at localhost,,var=val")
+ "4242,Example Mailbox,root at localhost,,var=val")
self.assertEqual(conf.categories[2].name, "template")
self.assertTrue(conf.categories[2].template)
@@ -174,7 +175,7 @@
self.assertEqual(conf.categories[2].options[0][1], "bar")
self.assertEqual(conf.categories[2].options[1][0], "exten")
self.assertEqual(conf.categories[2].options[1][1],
- "_NXX.,n,Wait(1)")
+ "_NXX.,n,Wait(1)")
self.assertEqual(conf.categories[2].options[2][0], "astetcdir")
self.assertEqual(conf.categories[2].options[2][1], "/etc/asterisk")
diff --git a/lib/python/asterisk/dns_server.py b/lib/python/asterisk/dns_server.py
index 8254c0b..19b3fe2 100644
--- a/lib/python/asterisk/dns_server.py
+++ b/lib/python/asterisk/dns_server.py
@@ -33,23 +33,25 @@
def __init__(self, config, test_obj):
"""Initialize and configure the DNS object."""
- zones = []
- port = config.get('port', 10053)
- pyzones = config.get('python-zones', [])
- bindzones = config.get('bind-zones', [])
+ zones = []
+ port = config.get('port', 10053)
+ pyzones = config.get('python-zones', [])
+ bindzones = config.get('bind-zones', [])
- for pyzone in pyzones:
- zones.append(authority.PySourceAuthority('%s/dns_zones/%s' % (test_obj.test_name, pyzone)))
- LOGGER.info("Added Python zone file %s" % (pyzone))
+ for pyzone in pyzones:
+ zones.append(authority.PySourceAuthority(
+ '%s/dns_zones/%s' % (test_obj.test_name, pyzone)))
+ LOGGER.info("Added Python zone file %s" % (pyzone))
- for bindzone in bindzones:
- zones.append(authority.BindAuthority('%s/dns_zones/%s' % (test_obj.test_name, bindzone)))
- LOGGER.info("Added BIND zone file %s" % (bindzone))
+ for bindzone in bindzones:
+ zones.append(authority.BindAuthority(
+ '%s/dns_zones/%s' % (test_obj.test_name, bindzone)))
+ LOGGER.info("Added BIND zone file %s" % (bindzone))
- factory = server.DNSServerFactory(authorities=zones)
- protocol = dns.DNSDatagramProtocol(controller=factory)
+ factory = server.DNSServerFactory(authorities=zones)
+ protocol = dns.DNSDatagramProtocol(controller=factory)
- reactor.listenUDP(port, protocol)
- reactor.listenTCP(port, factory)
+ reactor.listenUDP(port, protocol)
+ reactor.listenTCP(port, factory)
- LOGGER.info("Started DNS server (UDP and TCP) on port %d" % (port))
+ LOGGER.info("Started DNS server (UDP and TCP) on port %d" % (port))
diff --git a/lib/python/asterisk/fd_test_condition.py b/lib/python/asterisk/fd_test_condition.py
index d9a2811..4a8f869 100644
--- a/lib/python/asterisk/fd_test_condition.py
+++ b/lib/python/asterisk/fd_test_condition.py
@@ -15,6 +15,7 @@
LOGGER = logging.getLogger(__name__)
+
class FileDescriptor(object):
"""A small object that tracks a file descriptor.
@@ -115,7 +116,7 @@
finished_deferred = defer.Deferred()
exec_list = [super(FdPreTestCondition, self).get_file_descriptors(ast)
- for ast in self.ast]
+ for ast in self.ast]
defer.DeferredList(exec_list).addCallback(__raise_finished,
finished_deferred)
@@ -138,13 +139,19 @@
else:
# Find all file descriptors in pre-check not in post-check
for fd in related_test_condition.file_descriptors[ast_host]:
- if (len([f for f in self.file_descriptors[ast_host] if fd.number == f.number]) == 0):
+ if (len([
+ f for f
+ in self.file_descriptors[ast_host]
+ if fd.number == f.number]) == 0):
super(FdPostTestCondition, self).fail_check(
"Failed to find file descriptor %d [%s] in "
"post-test check" % (fd.number, fd.info))
# Find all file descriptors in post-check not in pre-check
for fd in self.file_descriptors[ast_host]:
- if (len([f for f in related_test_condition.file_descriptors[ast_host] if fd.number == f.number]) == 0):
+ if (len([
+ f for f
+ in related_test_condition.file_descriptors[ast_host]
+ if fd.number == f.number]) == 0):
super(FdPostTestCondition, self).fail_check(
"Failed to find file descriptor %d [%s] in "
"pre-test check" % (fd.number, fd.info))
@@ -152,7 +159,7 @@
finished_deferred.callback(self)
return finished_deferred
- if related_test_condition == None:
+ if related_test_condition is None:
msg = "No pre-test condition object provided"
super(FdPostTestCondition, self).fail_check(msg)
return
@@ -163,4 +170,3 @@
defer.DeferredList(exec_list).addCallback(__file_descriptors_obtained,
finished_deferred)
return finished_deferred
-
diff --git a/lib/python/asterisk/lock_test_condition.py b/lib/python/asterisk/lock_test_condition.py
index d28150a..0296693 100644
--- a/lib/python/asterisk/lock_test_condition.py
+++ b/lib/python/asterisk/lock_test_condition.py
@@ -17,6 +17,7 @@
LOGGER = logging.getLogger(__name__)
+
class LockSequence(object):
"""This class represents a sequence of lock objects"""
@@ -75,6 +76,7 @@
elif i == 6:
self.thread_func = thread_token.rstrip("())")
i += 1
+
class LockObject(object):
"""This class represents a detected sequence of locks in the system"""
@@ -144,6 +146,7 @@
self.name = lock_tokens[5]
self.addr = lock_tokens[6]
self.lock_count = int(lock_tokens[7].lstrip("(").rstrip(")"))
+
class LockTestCondition(TestCondition):
"""Class that performs checking of locks during test execution. Note that
@@ -226,6 +229,7 @@
finished_deferred)
return finished_deferred
+
class AstMockObjectPassed(object):
"""A lock output that passed"""
@@ -244,6 +248,7 @@
lock_lines += "===\n"
lock_lines += "=======================================================================\n"
return lock_lines
+
class AstMockObjectFailure(object):
"""A lock object that failed"""
@@ -308,6 +313,7 @@
lock_lines += "=======================================================================\n"
return lock_lines
+
class TestConfig(object):
"""Fake TestConfig object"""
@@ -318,6 +324,7 @@
self.type = "Post"
self.related_condition = ""
self.config = {}
+
class LockTestConditionUnitTest(unittest.TestCase):
"""Unit tests for LockTestCondition"""
@@ -347,6 +354,7 @@
obj.register_asterisk_instance(ast2)
obj.evaluate()
self.assertEqual(obj.get_status(), 'Failed')
+
class LockSequenceUnitTest(unittest.TestCase):
"""Tests for parsing a lock sequence"""
@@ -464,6 +472,7 @@
self.assertTrue(obj.locks[0].held)
self.assertTrue(len(obj.locks[0].backtrace) == 0)
+
class LockObjectUnitTest(unittest.TestCase):
"""Unit tests for LockObject"""
diff --git a/lib/python/asterisk/pcap.py b/lib/python/asterisk/pcap.py
index 246b55d..e35a328 100644
--- a/lib/python/asterisk/pcap.py
+++ b/lib/python/asterisk/pcap.py
@@ -29,6 +29,7 @@
LOGGER = logging.getLogger(__name__)
+
class PcapListener(object):
'''
A class that creates a pcap file from a test and optionally provides
@@ -79,6 +80,7 @@
packets as they arrive from the listener '''
pass
+
class Packet():
''' Some IP packet. Base class for everything else '''
@@ -112,7 +114,7 @@
self.sender_report = None
self.receiver_report = None
ports = factory_manager.get_global_data(self.ip_layer.header.source)
- if ports == None:
+ if ports is None:
raise Exception()
if (ports['rtcp'] != self.ip_layer.next.header.source):
raise Exception()
@@ -129,15 +131,20 @@
UBInt16('length'),
UBInt32('ssrc'))
self.rtcp_header = header_def.parse(binary_blob)
- report_block_def = GreedyRange(Struct('report_block',
- UBInt32('ssrc'),
- BitStruct('lost_counts',
- BitField('fraction_lost', 8),
- BitField('packets_lost', 24)),
- UBInt32('sequence_number_received'),
- UBInt32('interarrival_jitter'),
- UBInt32('last_sr'),
- UBInt32('delay_last_sr')))
+ report_block_def = GreedyRange(
+ Struct(
+ 'report_block',
+ UBInt32('ssrc'),
+ BitStruct(
+ 'lost_counts',
+ BitField('fraction_lost', 8),
+ BitField('packets_lost', 24)),
+ UBInt32('sequence_number_received'),
+ UBInt32('interarrival_jitter'),
+ UBInt32('last_sr'),
+ UBInt32('delay_last_sr')
+ )
+ )
if self.rtcp_header.packet_type == 200:
sender_def = Struct('sr',
Struct('sender_info',
@@ -155,9 +162,10 @@
self.receiver_report = receiver_def.parse(binary_blob[8:])
def __str__(self):
- return 'Header: %s\n%s: %s' % (self.rtcp_header,
- 'SR' if self.sender_report is not None else 'RR',
- self.sender_report if self.sender_report is not None else self.receiver_report)
+ if self.sender_report is not None:
+ return "Header: %s\n%s: %s" % (self.rtcp_header, 'SR', self.sender_report)
+ else:
+ return 'Header: %s\n%s: %s' % (self.rtcp_header, 'RR', self.receiver_report)
class RTPPacket(Packet):
@@ -172,10 +180,11 @@
'''
Packet.__init__(self, packet_type='RTP', raw_packet=raw_packet)
ports = factory_manager.get_global_data(self.ip_layer.header.source)
- if ports == None:
+ if ports is None:
raise Exception()
if (ports['rtp'] != self.ip_layer.next.header.source):
raise Exception()
+
class SDPPacket(Packet):
''' An SDP packet. Should be owned by a SIPPacket '''
@@ -214,6 +223,7 @@
Packet.__init__(self, packet_type="PIDF", raw_packet=raw_packet)
self.xml = ascii_packet.strip()
self.content_id = content_id
+
class MWIPacket(Packet):
'''An MWI body. Owned by SIPPacket or a MultipartPacket.'''
@@ -382,9 +392,10 @@
ret_packet.body.packet_type == 'SDP' and \
ret_packet.sdp_packet.rtp_port != 0 and \
ret_packet.sdp_packet.rtcp_port != 0:
- self._factory_manager.add_global_data(ret_packet.ip_layer.header.source,
- {'rtp': ret_packet.sdp_packet.rtp_port,
- 'rtcp': ret_packet.sdp_packet.rtcp_port})
+ self._factory_manager.add_global_data(
+ ret_packet.ip_layer.header.source,
+ {'rtp': ret_packet.sdp_packet.rtp_port,
+ 'rtcp': ret_packet.sdp_packet.rtcp_port})
return ret_packet
@@ -507,7 +518,7 @@
except:
pass
if interpreted_packet is not None:
- break;
+ break
return interpreted_packet
@@ -544,7 +555,8 @@
pass
if packet is None:
return
- LOGGER.debug('Got packet %s from %s' % (str(packet), packet.ip_layer.header.source))
+ LOGGER.debug('Got packet %s from %s' % (
+ str(packet), packet.ip_layer.header.source))
if packet.ip_layer.header.source not in self.traces:
self.traces[packet.ip_layer.header.source] = []
self.traces[packet.ip_layer.header.source].append(packet)
@@ -552,7 +564,6 @@
return
for callback in self._callbacks[packet.packet_type]:
callback(packet)
-
def add_callback(self, packet_type, callback):
''' Add a callback function for when a packet of a particular type
diff --git a/lib/python/asterisk/phones.py b/lib/python/asterisk/phones.py
index a1278cd..78f1200 100755
--- a/lib/python/asterisk/phones.py
+++ b/lib/python/asterisk/phones.py
@@ -162,8 +162,7 @@
try:
self.calls[0].hold()
except pj.Error as err:
- msg = ("Exception occurred while putting call on hold: '%s'" %
- str(err))
+ msg = ("Exception occurred while putting call on hold: '%s'" % str(err))
raise Exception(msg)
@@ -216,7 +215,7 @@
LOGGER.info("Call disconnected: '%s'" % self.call)
sip_call_id = self.call.info().sip_call_id
obj = next((call for call in self.phone.calls
- if call.info().sip_call_id == sip_call_id), None)
+ if call.info().sip_call_id == sip_call_id), None)
try:
self.phone.calls.remove(obj)
except ValueError:
@@ -280,6 +279,7 @@
call_info.remote_uri, call_info.state_text,
call_info.last_code, call_info.last_reason)
+
def call(test_object, triggered_by, ari, event, args):
"""Pluggable action module callback to make a call"""
controller = PjsuaPhoneController.get_instance()
@@ -295,6 +295,7 @@
except:
test_object.stop_reactor()
raise Exception("Exception: '%s'" % str(sys.exc_info()))
+
def hold(test_object, triggered_by, ari, event, args):
"""Pluggable action module callback to place a call on hold"""
@@ -315,6 +316,7 @@
except:
test_object.stop_reactor()
raise Exception("Exception: '%s'" % str(sys.exc_info()))
+
def transfer(test_object, triggered_by, ari, event, args):
"""Pluggable action module callback to transfer a call"""
@@ -351,4 +353,3 @@
if not res:
test_object.stop_reactor()
raise Exception(msg)
-
diff --git a/lib/python/asterisk/pjsua_mod.py b/lib/python/asterisk/pjsua_mod.py
index 37de8ce..6e526dd 100644
--- a/lib/python/asterisk/pjsua_mod.py
+++ b/lib/python/asterisk/pjsua_mod.py
@@ -20,6 +20,7 @@
LOGGER = logging.getLogger(__name__)
+
class RegDetector(pj.AccountCallback):
"""
Class that detects PJSUA account registration
@@ -232,9 +233,9 @@
pj_acct_cfg.transport_id = transport_id
LOGGER.info("Creating PJSUA account %s@%s" % (username, domain))
- account = PJsuaAccount(self.lib.create_account(pj_acct_cfg, False,
- RegDetector(self)),
- self.lib)
+ account = PJsuaAccount(
+ self.lib.create_account(pj_acct_cfg, False, RegDetector(self)),
+ self.lib)
account.add_buddies(acct_cfg.get('buddies', []))
return account
@@ -268,4 +269,3 @@
callback_module = __import__(self.callback_module)
callback_method = getattr(callback_module, self.callback_method)
callback_method(self.test_object, self.pj_accounts)
-
diff --git a/lib/python/asterisk/pluggable_modules.py b/lib/python/asterisk/pluggable_modules.py
index 4eefce6..b1be1f8 100755
--- a/lib/python/asterisk/pluggable_modules.py
+++ b/lib/python/asterisk/pluggable_modules.py
@@ -19,10 +19,11 @@
from starpy import fastagi
from test_runner import load_and_parse_module
from pluggable_registry import PLUGGABLE_ACTION_REGISTRY,\
- PLUGGABLE_EVENT_REGISTRY,\
- PluggableRegistry
+ PLUGGABLE_EVENT_REGISTRY,\
+ PluggableRegistry
LOGGER = logging.getLogger(__name__)
+
class Originator(object):
"""Pluggable module class that originates calls in Asterisk"""
@@ -272,8 +273,8 @@
LOGGER.debug("Channel %s hungup", event['channel'])
self.channels.remove(event['channel'])
self.num_calls += 1
- if 'min_calls' in self.config\
- and self.num_calls < self.config["min_calls"]:
+ if 'min_calls' in self.config \
+ and self.num_calls < self.config["min_calls"]:
return (ami, event)
if len(self.channels) == 0:
LOGGER.info("All channels have hungup; stopping test")
@@ -416,8 +417,8 @@
tolerance = self.actions[self.action_index].get('tolerance')
if ((filesize - size) > tolerance) or ((size - filesize) > tolerance):
LOGGER.error("""File '%s' failed size check: expected %d, actual %d
- (tolerance +/- %d""" % (self.filepath, size, filesize,
- tolerance))
+ (tolerance +/- %d""" % (
+ self.filepath, size, filesize, tolerance))
self.test_object.set_passed(False)
if self.auto_stop:
self.test_object.stop_reactor()
@@ -540,8 +541,8 @@
self.sound_file = config['sound-file']
if not self.sound_file:
raise Exception("No sound file parameters specified")
- if (not self.sound_file.get('file-name') or
- not self.sound_file.get('file-path-type')):
+ if (not self.sound_file.get('file-name')
+ or not self.sound_file.get('file-path-type')):
raise Exception("No file or file path type specified")
if self.sound_file.get('absolute-path'):
file_name = self.sound_file['file-name']
@@ -559,7 +560,7 @@
if not os.path.exists(self.filepath):
LOGGER.error("File '%s' does not exist!" % self.filepath)
self.test_object.set_passed(False)
- if self.auto_stop == True:
+ if self.auto_stop:
self.test_object.stop_reactor()
return
self.actions = self.sound_file.get('actions')
@@ -654,8 +655,9 @@
return
agi.sendCommand(self.commands[idx])\
- .addCallback(self.on_command_success, agi, idx)\
- .addErrback(self.on_command_failure, agi, idx)
+ .addCallback(self.on_command_success, agi, idx)\
+ .addErrback(self.on_command_failure, agi, idx)
+
class EventActionModule(object):
"""A class that links arbitrary events with one or more actions.
@@ -779,6 +781,7 @@
for action_mod in triggered_set["actions"]:
action_mod.run(triggered_by, source, extra)
+
class TestStartEventModule(object):
"""An event module that triggers when the test starts."""
@@ -794,6 +797,7 @@
self.triggered_callback(self, ast)
PLUGGABLE_EVENT_REGISTRY.register("test-start", TestStartEventModule)
+
class LogActionModule(object):
"""An action module that logs a message when triggered."""
@@ -806,6 +810,7 @@
"""Log a message."""
LOGGER.info(self.message)
PLUGGABLE_ACTION_REGISTRY.register("logger", LogActionModule)
+
class CallbackActionModule(object):
"""An action module that calls the specified callback."""
@@ -823,6 +828,7 @@
self.test_object.set_passed(method(self.test_object, triggered_by,
source, extra))
PLUGGABLE_ACTION_REGISTRY.register("callback", CallbackActionModule)
+
class StopTestActionModule(object):
"""Action module that stops a test"""
@@ -847,6 +853,7 @@
self.test_object.stop_reactor()
PLUGGABLE_ACTION_REGISTRY.register("stop_test", StopTestActionModule)
+
class PjsuaPhoneActionModule(object):
"""An action module that instructs a phone to perform an action."""
@@ -862,4 +869,3 @@
method = getattr(self.module, self.method)
method(self.test_object, triggered_by, source, extra, self.config)
PLUGGABLE_ACTION_REGISTRY.register("pjsua_phone", PjsuaPhoneActionModule)
-
diff --git a/lib/python/asterisk/pluggable_registry.py b/lib/python/asterisk/pluggable_registry.py
index 821aa53..8169795 100755
--- a/lib/python/asterisk/pluggable_registry.py
+++ b/lib/python/asterisk/pluggable_registry.py
@@ -12,6 +12,7 @@
LOGGER = logging.getLogger(__name__)
+
class PluggableRegistry(object):
"""Registry for pluggable modules"""
@@ -35,6 +36,7 @@
PLUGGABLE_EVENT_REGISTRY = PluggableRegistry()
PLUGGABLE_ACTION_REGISTRY = PluggableRegistry()
+
def var_replace(text, values):
""" perform variable replacement on text
diff --git a/lib/python/asterisk/sip_channel_test_condition.py b/lib/python/asterisk/sip_channel_test_condition.py
index c1fbda7..150d90f 100644
--- a/lib/python/asterisk/sip_channel_test_condition.py
+++ b/lib/python/asterisk/sip_channel_test_condition.py
@@ -12,6 +12,7 @@
from twisted.internet import defer
from test_conditions import TestCondition
+
class SipChannelTestCondition(TestCondition):
"""Test condition that checks for the existence of SIP channels.
@@ -58,7 +59,7 @@
super(SipChannelTestCondition, self).pass_check()
exec_list = [ast.cli_exec('sip show channels').addCallback(
- __channel_callback) for ast in self.ast]
+ __channel_callback) for ast in self.ast]
defer.DeferredList(exec_list).addCallback(__raise_finished,
finished_deferred)
return finished_deferred
diff --git a/lib/python/asterisk/sip_dialog_test_condition.py b/lib/python/asterisk/sip_dialog_test_condition.py
index 3dad759..2b2538e 100644
--- a/lib/python/asterisk/sip_dialog_test_condition.py
+++ b/lib/python/asterisk/sip_dialog_test_condition.py
@@ -17,6 +17,7 @@
LOGGER = logging.getLogger(__name__)
+
class SipDialogTestCondition(TestCondition):
"""This class is a base class for the pre- and post-test condition
classes that check for the existence of SIP dialogs in Asterisk. It provides
@@ -113,7 +114,7 @@
if ast.host == result.host:
__get_dialogs(ast)
return result
- LOGGER.warning("Unable to determine Asterisk instance from CLI " \
+ LOGGER.warning("Unable to determine Asterisk instance from CLI "
"command run on host %s" % result.host)
return result
@@ -128,8 +129,8 @@
if len(dialog_history) > 0:
# If any dialogs are present before test execution, something
# funny is going on
- super(SipDialogPreTestCondition, self).fail_check("%d dialogs " \
- "were detected in Asterisk %s before test execution" %
+ super(SipDialogPreTestCondition, self).fail_check(
+ "%d dialogs were detected in Asterisk %s before test execution" %
(len(dialog_history), result.host))
else:
super(SipDialogPreTestCondition, self).pass_check()
@@ -143,8 +144,9 @@
self._finished_deferred = defer.Deferred()
# Turn on history and check for dialogs
for ast in self.ast:
- ast.cli_exec("sip set history on").addCallback(__history_finished)
+ ast.cli_exec("sip set history on").addCallback(__history_finished)
return self._finished_deferred
+
class SipDialogPostTestCondition(SipDialogTestCondition):
"""Check the post-test conditions for SIP dialogs.
@@ -209,13 +211,14 @@
history_requirements[dialog][req] = True
if not scheduled:
super(SipDialogPostTestCondition, self).fail_check(
- "Dialog %s in Asterisk instance %s not scheduled for " \
+ "Dialog %s in Asterisk instance %s not scheduled for "
"destruction" % (dialog, self.ast[self._counter].host))
for req in history_requirements[dialog].keys():
- if history_requirements[dialog][req] == False:
+ if not history_requirements[dialog][req]:
super(SipDialogPostTestCondition, self).fail_check(
- "Dialog %s in Asterisk instance %s did not have " \
- "required step in history: %s" % (dialog,
+ "Dialog %s in Asterisk instance %s did not have "
+ "required step in history: %s" % (
+ dialog,
self.ast[self._counter].host, req))
__get_dialogs()
return result
@@ -224,6 +227,7 @@
self._counter = -1
__get_dialogs()
return self._finished_deferred
+
class TestConfig(object):
"""Mock TestConfig object"""
@@ -239,6 +243,7 @@
self.related_condition = ""
self.config = {}
+
class TestConfigWithHistory(TestConfig):
"""Mock TestConfig object with history requirements"""
@@ -249,6 +254,7 @@
self.config['history_requirements'] = []
self.config['history_requirements'].append('Hangup')
self.config['history_requirements'].append('NewChan')
+
class AstMockObjectPostTestNoDestructionFail(object):
"""Mock out CLI execution from Asterisk instance
@@ -313,6 +319,7 @@
ret_string += "11. TxResp SIP/2.0 / 102 BYE - 200 OK\n"
ret_string += "12. Hangup Cause Normal Clearing\n"
return ret_string
+
class AstMockObjectPostTestNoHangupFail(object):
"""Mock out CLI execution from Asterisk instance
@@ -379,6 +386,7 @@
ret_string += "13. Hangup Cause Normal Clearing\n"
return ret_string
+
class AstMockObjectPostTestNoDialogsPass(object):
"""Mock out CLI execution from Asterisk instance
@@ -406,6 +414,7 @@
ret_string += "-= Dialog objects:\n\n"
return ret_string
+
class AstMockObjectPostTestPass(object):
"""Mock out CLI execution from Asterisk instance
@@ -472,6 +481,7 @@
ret_string += "13. Hangup Cause Normal Clearing\n"
return ret_string
+
class AstMockObjectPreTestFail(object):
"""Mock out CLI execution from Asterisk instance
@@ -517,6 +527,7 @@
ret_string += "12. TxResp SIP/2.0 / 102 BYE - 200 OK\n"
ret_string += "13. Hangup Cause Normal Clearing\n"
return ret_string
+
class AstMockObjectPreTestPass(object):
"""Mock out CLI execution from Asterisk instance
@@ -626,6 +637,7 @@
obj.evaluate()
self.assertEqual(obj.get_status(), 'Failed')
+
def main():
"""Execute the unit tests"""
logging.basicConfig(level=logging.DEBUG)
diff --git a/lib/python/asterisk/sipp.py b/lib/python/asterisk/sipp.py
index 4ce4a89..ff959e0 100644
--- a/lib/python/asterisk/sipp.py
+++ b/lib/python/asterisk/sipp.py
@@ -151,7 +151,6 @@
"""
self._scenario_stopped_observers.append(observer)
-
def register_intermediate_obverver(self, observer):
"""Register a function to be called in between SIPp scenarios.
@@ -208,7 +207,7 @@
sipp_scenarios = []
for scenario in scenario_set:
if ("coordinated-sender" in scenario
- and "coordinated-receiver" in scenario):
+ and "coordinated-receiver" in scenario):
sipp_scenarios.append(CoordinatedScenario(self.test_name,
scenario))
else:
@@ -234,10 +233,8 @@
self._fail_on_any,
self._intermediate_callback_fn,
final_deferred)
- sipp_sequence.register_scenario_start_callback(
- self._scenario_start_callback_fn)
- sipp_sequence.register_scenario_stop_callback(
- self._scenario_stop_callback_fn)
+ sipp_sequence.register_scenario_start_callback(self._scenario_start_callback_fn)
+ sipp_sequence.register_scenario_stop_callback(self._scenario_stop_callback_fn)
sipp_sequence.execute()
def _intermediate_callback_fn(self, result):
@@ -255,7 +252,7 @@
observer(result)
return result
- # Allow some time for the SIPp process to come up
+ # Allow some time for the SIPp process to come up
reactor.callLater(.25, __run_callback, result)
def _scenario_stop_callback_fn(self, result):
@@ -596,24 +593,24 @@
method, then auto-fail the test if the scenario fails. """
if not self.passed:
LOGGER.warning("SIPp Scenario %s Failed" %
- self.scenario['scenario'])
+ self.scenario['scenario'])
self._test_case.passed = False
self._test_case.stop_reactor()
return result
sipp_args = [
- self.sipp, self.target,
- '-sf',
- '%s/sipp/%s' % (self.test_dir, self.scenario['scenario']),
- '-nostdin',
- '-skip_rlimit',
+ self.sipp, self.target,
+ '-sf',
+ '%s/sipp/%s' % (self.test_dir, self.scenario['scenario']),
+ '-nostdin',
+ '-skip_rlimit',
]
default_args = {
- '-p' : str(self.default_port),
- '-m' : '1',
- '-i' : '127.0.0.1',
- '-timeout' : '20s'
+ '-p': str(self.default_port),
+ '-m': '1',
+ '-i': '127.0.0.1',
+ '-timeout': '20s'
}
# Override and extend defaults
@@ -622,17 +619,15 @@
# correct the path specified by -slave_cfg
if '-slave_cfg' in default_args:
- default_args['-slave_cfg'] = ('%s/sipp/%s' %
- (self.test_dir,
- default_args['-slave_cfg']))
+ default_args['-slave_cfg'] = ('%s/sipp/%s' % (
+ self.test_dir, default_args['-slave_cfg']))
if '-inf' in default_args:
- default_args['-inf'] = ('%s/sipp/%s' %
- (self.test_dir,
- default_args['-inf']))
+ default_args['-inf'] = ('%s/sipp/%s' % (
+ self.test_dir, default_args['-inf']))
for (key, val) in default_args.items():
- sipp_args.extend([ key, val ])
+ sipp_args.extend([key, val])
sipp_args.extend(self.positional_args)
LOGGER.info("Executing SIPp scenario: %s" % self.scenario['scenario'])
@@ -646,14 +641,16 @@
self._test_case = test_case
exit_deferred.addCallback(__evaluate_scenario_results)
- self._process = SIPpProtocol(self.scenario['scenario'], exit_deferred, start_deferred)
+ self._process = SIPpProtocol(self.scenario['scenario'], exit_deferred,
+ start_deferred)
reactor.spawnProcess(self._process,
sipp_args[0],
sipp_args,
- {"TERM" : "vt100", },
+ {"TERM": "vt100", },
None,
None)
return self._our_exit_deferred
+
class CoordinatedScenario(object):
"""A SIPp based scenario for the Asterisk testsuite that handles basic 3PCC
@@ -697,15 +694,14 @@
sender_config['key-args']['-3pcc'] = coordination_address
target = sender_config.get('target', '127.0.0.1')
self.sender = SIPpScenario(test_dir,
- sender_config['key-args'],
- sender_config.get('ordered-args', []),
- target=target)
+ sender_config['key-args'],
+ sender_config.get('ordered-args', []),
+ target=target)
self.exited = False
self.passed = False
self.results = []
self.name = "Coordinated Scenario %d" % self.coordination_port
-
def kill(self):
"""Kill the executing SIPp scenario"""
@@ -762,9 +758,8 @@
# setup callback for receiver completion
exit_deferred = defer.Deferred()
receiver_deferred = self.receiver.run(test_case,
- receiver_start_deferred)
+ receiver_start_deferred)
receiver_deferred.addCallback(__scenario_callback, exit_deferred)
-
return exit_deferred
@@ -858,4 +853,3 @@
deferds.append(deferred)
defer.DeferredList(deferds).addCallback(__set_pass_fail)
-
diff --git a/lib/python/asterisk/sippversion.py b/lib/python/asterisk/sippversion.py
index c244de8..b1110fc 100644
--- a/lib/python/asterisk/sippversion.py
+++ b/lib/python/asterisk/sippversion.py
@@ -15,6 +15,7 @@
import test_suite_utils
+
class SIPpVersion:
"""A SIPp Version.
@@ -36,14 +37,14 @@
if version is None and feature is None:
sipp = test_suite_utils.which("sipp")
if sipp is None:
- return
+ return
cmd = [
sipp, "-v"
]
try:
sipp_process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ stderr=subprocess.STDOUT)
except OSError:
return
for line in sipp_process.stdout:
@@ -124,6 +125,7 @@
if value.find("PCAP") > -1:
self.pcap = True
+
class SIPpVersionTests(unittest.TestCase):
def test_version(self):
v = SIPpVersion("v3.2", None)
diff --git a/lib/python/asterisk/syncami.py b/lib/python/asterisk/syncami.py
index 48dd237..031ae99 100644
--- a/lib/python/asterisk/syncami.py
+++ b/lib/python/asterisk/syncami.py
@@ -91,5 +91,3 @@
parser = HeaderParser()
return parser.parsestr(data)
-
-
diff --git a/lib/python/asterisk/test_case.py b/lib/python/asterisk/test_case.py
index c5f5143..83de4cc 100644
--- a/lib/python/asterisk/test_case.py
+++ b/lib/python/asterisk/test_case.py
@@ -33,6 +33,7 @@
LOGGER = None
+
def setup_logging(log_dir):
"""Initialize the logger"""
@@ -195,9 +196,9 @@
# Get those global conditions that are not in the self conditions
for g_cond in global_conditions:
- disallowed = [i for i in conditions \
- if i[0].get_name() == g_cond[0].get_name() \
- and i[1] == g_cond[1]]
+ disallowed = [i for i in conditions
+ if i[0].get_name() == g_cond[0].get_name()
+ and i[1] == g_cond[1]]
if len(disallowed) == 0:
conditions.append(g_cond)
@@ -207,12 +208,11 @@
if pre_post_type == "PRE":
self.condition_controller.register_pre_test_condition(obj)
elif pre_post_type == "POST":
- self.condition_controller.register_post_test_condition(\
- obj, related_name)
+ self.condition_controller.register_post_test_condition(obj, related_name)
else:
msg = "Unknown condition type [%s]" % pre_post_type
LOGGER.warning(msg)
- self.condition_controller.register_observer(\
+ self.condition_controller.register_observer(
self.handle_condition_failure, 'Failed')
def create_asterisk(self, count=1, base_configs_path=None):
@@ -321,7 +321,7 @@
# port, while a general logger will want to watch more general traffic
# which can be filtered later.
return PcapListener(device, bpf_filter, dumpfile, self._pcap_callback,
- snaplen, buffer_size)
+ snaplen, buffer_size)
def start_asterisk(self):
"""This method will be called when the reactor is running, but
@@ -539,7 +539,7 @@
def reset_timeout(self):
"""Resets the reactor timeout"""
- if self.timeout_id != None:
+ if self.timeout_id is not None:
original_time = datetime.fromtimestamp(self.timeout_id.getTime())
self.timeout_id.reset(self.reactor_timeout)
new_time = datetime.fromtimestamp(self.timeout_id.getTime())
@@ -624,7 +624,7 @@
A token that can be removed from the test at a later time, if the test
should pass
"""
- fail_token = {'uuid' : uuid.uuid4(), 'message' : message}
+ fail_token = {'uuid': uuid.uuid4(), 'message': message}
self.fail_tokens.append(fail_token)
return fail_token
@@ -647,7 +647,7 @@
If a test module has already claimed that the test has failed, then this
method will ignore any further attempts to change the pass/fail status.
"""
- if self.passed == False:
+ if not self.passed:
return
self.passed = value
@@ -685,15 +685,15 @@
if test_config is None or 'test-iterations' not in test_config:
# No special test configuration defined, use defaults
- variables = {'testuniqueid': '%s' % (str(uuid.uuid1())),}
+ variables = {'testuniqueid': '%s' % (str(uuid.uuid1())), }
defaults = {'channel': SimpleTestCase.default_channel,
'application': SimpleTestCase.default_application,
- 'variable': variables,}
+ 'variable': variables, }
self._test_runs.append(defaults)
else:
# Use the info in the test config to figure out what we want to run
for iteration in test_config['test-iterations']:
- variables = {'testuniqueid': '%s' % (str(uuid.uuid1())),}
+ variables = {'testuniqueid': '%s' % (str(uuid.uuid1())), }
iteration['variable'] = variables
self._test_runs.append(iteration)
if 'expected_events' in test_config:
@@ -719,7 +719,6 @@
# Kick off the test runs
self.__start_new_call(ami)
-
def __originate_call(self, ami, call_details):
"""Actually originate a call
@@ -750,7 +749,7 @@
call_details['otherchannelid'] = None
if 'application' in call_details:
msg += " with application %s" % call_details['application']
- deferred = ami.originate(\
+ deferred = ami.originate(
channel=call_details['channel'],
application=call_details['application'],
variable=call_details['variable'],
@@ -762,7 +761,7 @@
msg += " to %s@%s at %s" % (call_details['exten'],
call_details['context'],
call_details['priority'],)
- deferred = ami.originate(\
+ deferred = ami.originate(
channel=call_details['channel'],
context=call_details['context'],
exten=call_details['exten'],
@@ -777,7 +776,6 @@
else:
deferred.addErrback(self.handle_originate_failure)
LOGGER.info(msg)
-
def __varset_cb(self, ami, event):
"""VarSet event handler. This event helps us tie back the channel
@@ -803,7 +801,6 @@
LOGGER.debug("Tracking originated channel %s as %s (ID %s)" % (
originating_channel, event['channel'], event['value']))
-
def __hangup_cb(self, ami, event):
"""Hangup Event handler.
@@ -819,7 +816,6 @@
self._current_run += 1
self.__start_new_call(ami)
-
def __start_new_call(self, ami):
"""Kick off the next new call, or, if we've run out of calls to make,
stop the test
@@ -830,7 +826,6 @@
else:
LOGGER.info("All calls executed, stopping")
reactor.callLater(self._end_test_delay, self.stop_reactor)
-
def __event_cb(self, ami, event):
"""UserEvent callback handler.
@@ -850,7 +845,6 @@
self._current_run += 1
self.__start_new_call(ami)
-
def hangup(self, result):
"""Called when all channels are hung up"""
@@ -866,7 +860,6 @@
def verify_event(self, event):
"""Virtual method used to verify values in the event."""
return True
-
def run(self):
"""Run the test!"""
@@ -902,4 +895,3 @@
self.create_ami_factory(count=self.asterisk_instances)
if self.connect_agi:
self.create_fastagi_factory(count=self.asterisk_instances)
-
diff --git a/lib/python/asterisk/test_conditions.py b/lib/python/asterisk/test_conditions.py
index 9fa23aa..1bff550 100644
--- a/lib/python/asterisk/test_conditions.py
+++ b/lib/python/asterisk/test_conditions.py
@@ -24,9 +24,11 @@
LOGGER = logging.getLogger(__name__)
+
def enum(**enums):
"""Make an enumeration out of the passed in values"""
return type('Enum', (), enums)
+
def handle_condition_failure(test_condition):
"""Handle a failure in a test condition.
@@ -116,7 +118,7 @@
if pre[0].get_name() == matching_pre_condition_name:
matching_pre_condition = pre[0]
break
- if (matching_pre_condition == None):
+ if (matching_pre_condition is None):
err_msg = ("No pre condition found matching %s" %
matching_pre_condition_name)
LOGGER.error(err_msg)
@@ -227,7 +229,7 @@
observer(test_condition)
if (test_condition.get_status() == 'Failed' and
- self._stop_test_callback != None):
+ self._stop_test_callback is not None):
self._stop_test_callback()
@@ -281,7 +283,7 @@
build_option, expected_value = option
if not TestCondition.build_options.check_option(build_option,
expected_value):
- LOGGER.debug("Build option %s not set to %s; test condition " \
+ LOGGER.debug("Build option %s not set to %s; test condition "
"[%s] will not be checked" % (build_option,
expected_value,
self._name))
@@ -355,7 +357,7 @@
if (self._test_status == TEST_STATUSES.Inconclusive):
self._test_status = TEST_STATUSES.Passed
- def fail_check(self, reason = ""):
+ def fail_check(self, reason=""):
"""Mark that the test condition has failed.
Note that the test condition cannot be changed once placed in this
@@ -367,5 +369,3 @@
self._test_status = TEST_STATUSES.Failed
if (reason != ""):
self.failure_reasons.append(reason)
-
-
diff --git a/lib/python/asterisk/test_config.py b/lib/python/asterisk/test_config.py
index 0229c6f..2057c1b 100644
--- a/lib/python/asterisk/test_config.py
+++ b/lib/python/asterisk/test_config.py
@@ -25,6 +25,7 @@
from buildoptions import AsteriskBuildOptions
from sippversion import SIPpVersion
+
class TestConditionConfig(object):
"""This class creates a test condition config and will build up an
object that derives from TestCondition based on that configuration
@@ -78,6 +79,7 @@
obj = mod(self)
return obj
return None
+
class Dependency(object):
"""Class that checks and stores the dependencies for a particular Test."""
@@ -280,7 +282,7 @@
def _process_global_settings(self):
"""Process settings in the top-level test-yaml config file"""
- if self.global_test_config != None:
+ if self.global_test_config is not None:
settings = self.global_test_config
self.condition_definitions = settings.condition_definitions
self.forced_version = settings.forced_version
@@ -295,10 +297,10 @@
if self.test_configuration in self.config:
self.config = self.config[self.test_configuration]
- if self.config != None and 'exclude-tests' in self.config:
+ if self.config is not None and 'exclude-tests' in self.config:
self.excluded_tests = self.config['exclude-tests']
else:
- print ("WARNING - test configuration [%s] not found in " \
+ print ("WARNING - test configuration [%s] not found in "
"config file" % self.test_configuration)
def _process_testinfo(self):
@@ -306,7 +308,7 @@
self.summary = "(none)"
self.description = "(none)"
- if self.config == None:
+ if self.config is None:
return
if "testinfo" not in self.config:
return
@@ -363,7 +365,7 @@
if not self.config:
print "ERROR: Failed to load configuration for test '%s'" % \
- self.test_name
+ self.test_name
return
self._process_global_settings()
@@ -381,8 +383,8 @@
"""
conditions = []
conditions_temp = []
- if (not self.config or 'properties' not in self.config or
- 'testconditions' not in self.config['properties']):
+ if (not self.config or 'properties' not in self.config
+ or 'testconditions' not in self.config['properties']):
return conditions
for conf in self.config['properties'].get('testconditions'):
diff --git a/lib/python/asterisk/test_runner.py b/lib/python/asterisk/test_runner.py
index 8d28f9d..640ccd3 100755
--- a/lib/python/asterisk/test_runner.py
+++ b/lib/python/asterisk/test_runner.py
@@ -26,6 +26,7 @@
from version import AsteriskVersion
+
class TestModuleFinder(object):
"""Determines if a module is a test module that can be loaded"""
@@ -82,12 +83,15 @@
if fullname in sys.modules:
mod = sys.modules[fullname]
else:
- mod = sys.modules.setdefault(fullname,
+ mod = sys.modules.setdefault(
+ fullname,
imp.load_source(fullname, self._get_filename(fullname)))
return mod
+
sys.path_hooks.append(TestModuleFinder)
+
def load_test_modules(test_config, test_object, ast_version):
"""Load the pluggable modules for a test
@@ -111,8 +115,8 @@
if check_module_version(module_spec, ast_version):
# If there's a specific portion of the config for this module,
# use it
- if ('config-section' in module_spec and
- module_spec['config-section'] in test_config):
+ if ('config-section' in module_spec
+ and module_spec['config-section'] in test_config):
module_config = test_config[module_spec['config-section']]
else:
module_config = test_config
@@ -122,9 +126,10 @@
# and the test object that they attach to
module_type(module_config, test_object)
else:
- LOGGER.debug("Skipping the loading of test module %s due to it's " \
- "minversion and/or maxversion not being met." %
- module_spec['typename'])
+ LOGGER.debug("Skipping the loading of test module %s due to it's "
+ "minversion and/or maxversion not being met." %
+ module_spec['typename'])
+
def check_module_version(module_spec, ast_version):
"""Check the module configuration for minversion and maxversion and check
@@ -142,13 +147,14 @@
modminversion = module_spec.get('minversion')
modmaxversion = module_spec.get('maxversion')
if (modminversion is not None and
- AsteriskVersion(ast_version) < AsteriskVersion(modminversion)):
+ AsteriskVersion(ast_version) < AsteriskVersion(modminversion)):
return False
if (modmaxversion is not None and
- AsteriskVersion(ast_version) >= AsteriskVersion(modmaxversion)):
+ AsteriskVersion(ast_version) >= AsteriskVersion(modmaxversion)):
return False
return True
+
def load_and_parse_module(type_name):
"""Take a qualified module/object name, load the module, and return
@@ -178,6 +184,7 @@
module = getattr(module, comp)
return module
+
def create_test_object(test_path, test_config):
"""Create the specified test object from the test configuration
@@ -206,8 +213,8 @@
return None
test_object_config = None
- if ('config-section' in test_object_spec and
- test_object_spec['config-section'] in test_config):
+ if ('config-section' in test_object_spec
+ and test_object_spec['config-section'] in test_config):
test_object_config = test_config[test_object_spec['config-section']]
else:
test_object_config = test_config
@@ -217,6 +224,7 @@
# config object, if none is specified)
test_obj = module_obj(test_path, test_object_config)
return test_obj
+
def load_test_config(test_directory):
"""Load and parse the yaml test config specified by the test_directory
@@ -246,6 +254,7 @@
return test_config
+
def read_module_paths(test_config, test_path):
"""Read additional paths required for loading modules for the test
@@ -273,7 +282,8 @@
TestModuleFinder.supported_paths.append(os.path.join(test_path, path))
sys.path.append(os.path.join(test_path, path))
-def main(argv = None):
+
+def main(argv=None):
"""Main entry point for the test run
Returns:
@@ -285,7 +295,7 @@
args = sys.argv
if (len(args) < 2):
- LOGGER.error("test_runner requires the full path to the test " \
+ LOGGER.error("test_runner requires the full path to the test "
"directory to execute")
return 1
test_directory = args[1]
diff --git a/lib/python/asterisk/test_state.py b/lib/python/asterisk/test_state.py
index 6e6cf1d..5d04ca4 100644
--- a/lib/python/asterisk/test_state.py
+++ b/lib/python/asterisk/test_state.py
@@ -15,6 +15,7 @@
LOGGER = logging.getLogger(__name__)
+
def print_test_event(event):
"""Log a test event
@@ -54,16 +55,16 @@
print_test_event(event)
if event['type'] == 'StateChange':
- if (self._current_state != None):
+ if (self._current_state is not None):
self._current_state.handle_state_change(ami, event)
else:
LOGGER.error("No initial state set before TestEvent received")
self._current_state = FailureTestState(self)
elif event['type'] == 'Assert':
- if (self._assert_handler != None):
+ if (self._assert_handler is not None):
self._assert_handler(ami, event)
else:
- LOGGER.warn("ASSERT received but no handler defined; " \
+ LOGGER.warn("ASSERT received but no handler defined; "
"test will now fail")
self.fail_test()
@@ -109,7 +110,7 @@
"""
self.controller = controller
- if (self.controller == None):
+ if (self.controller is None):
LOGGER.error("Controller is none")
raise RuntimeError('Controller is none')
@@ -133,6 +134,7 @@
new_state The new TestState to change to
"""
self.controller.change_state(new_state)
+
class FailureTestState(TestState):
"""A generic failure state.
@@ -170,5 +172,3 @@
new_state The new TestState to change to
"""
return
-
-
diff --git a/lib/python/asterisk/test_suite_utils.py b/lib/python/asterisk/test_suite_utils.py
index 29425c5..d32d7d7 100644
--- a/lib/python/asterisk/test_suite_utils.py
+++ b/lib/python/asterisk/test_suite_utils.py
@@ -1,5 +1,5 @@
#! /usr/bin/env python
-"""Asterisk testsuite utils
+"""Asterisk testsuite utils
This module provides access to Asterisk testsuite utility
functions from within python code.
@@ -21,6 +21,7 @@
from tempfile import mkstemp
LOGGER = logging.getLogger(__name__)
+
def which(program):
"""Find the executable for a specified program
@@ -45,6 +46,7 @@
return None
+
def file_replace_string(file_name, pattern, subst):
"""Replace strings within a file with substr.
@@ -58,7 +60,7 @@
"""
# Create temp file
f_handle, abs_path = mkstemp()
- new_file = open(abs_path,'w')
+ new_file = open(abs_path, 'w')
old_file = open(file_name)
for line in old_file:
new_file.write(line.replace(pattern, subst))
@@ -71,6 +73,7 @@
# Move new file
move(abs_path, file_name)
+
def all_match(pattern, message):
"""Match all items in a pattern to some message values
@@ -81,8 +84,7 @@
:param message: Message to compare.
:returns: True if message matches pattern; False otherwise.
"""
- LOGGER.debug('Pattern: %s, message %s' %
- (str(pattern), str(message)))
+ LOGGER.debug('Pattern: %s, message %s' % (str(pattern), str(message)))
if pattern is None:
# Empty pattern always matches
return True
@@ -168,4 +170,3 @@
return addr
return None
-
diff --git a/lib/python/asterisk/thread_test_condition.py b/lib/python/asterisk/thread_test_condition.py
index d366810..0b622ea 100644
--- a/lib/python/asterisk/thread_test_condition.py
+++ b/lib/python/asterisk/thread_test_condition.py
@@ -15,6 +15,7 @@
LOGGER = logging.getLogger(__name__)
+
class ThreadTestCondition(TestCondition):
"""Base class for the thread pre-/post-test conditions
@@ -72,9 +73,8 @@
initial_partition = initial_partition[2].partition(' ')
thread_id = initial_partition[0]
thread_name = initial_partition[2].partition(' ')[0]
- if (thread_id != "" and
- thread_name != "" and
- thread_name not in self.ignored_threads):
+ if (thread_id != "" and thread_name != ""
+ and thread_name not in self.ignored_threads):
LOGGER.debug("Tracking thread %s[%s]" %
(thread_name, thread_id))
thread_list.append((thread_id, thread_name))
@@ -115,13 +115,12 @@
return result
finished_deferred = defer.Deferred()
- defer_list = defer.DeferredList(
- [ast.cli_exec("core show threads").addCallback(
- __show_threads_callback,
- ast)
- for ast in self.ast])
+ defer_list = defer.DeferredList([
+ ast.cli_exec("core show threads").addCallback(__show_threads_callback, ast)
+ for ast in self.ast])
defer_list.addCallback(__threads_gathered, finished_deferred)
return finished_deferred
+
class ThreadPostTestCondition(ThreadTestCondition):
"""The post-test condition object.
@@ -175,46 +174,42 @@
ast_match_found = True
# Create a list of each thread in the post check not in the
# pre check and vice versa
- bad_post_threads = [thread_obj
- for thread_obj in ast[1]
- if not __evaluate_thread_obj_in_list(thread_obj, pre_ast[1])]
- bad_pre_threads = [thread_obj
- for thread_obj in pre_ast[1]
- if not __evaluate_thread_obj_in_list(thread_obj, ast[1])]
+ bad_post_threads = [
+ thread_obj for thread_obj in ast[1]
+ if not __evaluate_thread_obj_in_list(thread_obj, pre_ast[1])]
+ bad_pre_threads = [
+ thread_obj for thread_obj in pre_ast[1]
+ if not __evaluate_thread_obj_in_list(thread_obj, ast[1])]
if (bad_post_threads):
for thread_obj in bad_post_threads:
- msg = ("Failed to find thread %s[%s] on Asterisk " \
+ msg = ("Failed to find thread %s[%s] on Asterisk "
"instance %s in pre-test check" %
(thread_obj[1], thread_obj[0], ast[0]))
super(ThreadPostTestCondition, self).fail_check(msg)
if (bad_pre_threads):
for thread_obj in bad_pre_threads:
- msg = ("Failed to find thread %s[%s] on Asterisk " \
- "instance %s in post-test check" %
- (thread_obj[1], thread_obj[0], ast[0]))
+ msg = ("Failed to find thread %s[%s] on Asterisk "
+ "instance %s in post-test check" %
+ (thread_obj[1], thread_obj[0], ast[0]))
super(ThreadPostTestCondition, self).fail_check(msg)
- if (len(bad_post_threads) == 0 and
- len(bad_pre_threads) == 0):
+ if (len(bad_post_threads) == 0 and len(bad_pre_threads) == 0):
super(ThreadPostTestCondition, self).pass_check()
if not ast_match_found:
- msg = ("Unable to find Asterisk instance %s in pre-test " \
+ msg = ("Unable to find Asterisk instance %s in pre-test "
"condition check" % ast[0])
super(ThreadPostTestCondition, self).fail_check(msg)
finished_deferred.callback(self)
return result
# This must have a related_test_condition value passed in
- if (related_test_condition == None):
+ if (related_test_condition is None):
msg = "No pre-test condition provided"
super(ThreadPostTestCondition, self).fail_check(msg)
return
finished_deferred = defer.Deferred()
- defer_list = defer.DeferredList(
- [ast.cli_exec("core show threads").addCallback(
- __show_threads_callback,
- ast)
- for ast in self.ast])
+ defer_list = defer.DeferredList([
+ ast.cli_exec("core show threads").addCallback(__show_threads_callback, ast)
+ for ast in self.ast])
defer_list.addCallback(__threads_gathered, finished_deferred)
return finished_deferred
-
diff --git a/lib/python/asterisk/version.py b/lib/python/asterisk/version.py
index 3b7bf5b..4432cf2 100644
--- a/lib/python/asterisk/version.py
+++ b/lib/python/asterisk/version.py
@@ -22,6 +22,7 @@
LOGGER = logging.getLogger(__name__)
+
def parse_branch_name(branch_tokens):
"""Parse an Asterisk branch version"""
name = branch_tokens[0]
@@ -29,13 +30,14 @@
for i in range(1, len(branch_tokens)):
# Stop when we hit the revision
if branch_tokens[i][0] == 'r':
- candidate = branch_tokens[i].replace('r','')
- candidate = candidate.replace('M','').replace('m','')
+ candidate = branch_tokens[i].replace('r', '')
+ candidate = candidate.replace('M', '').replace('m', '')
if candidate.isdigit():
break
name += '-' + branch_tokens[i]
munched += 1
return (name, munched)
+
def parse_version(version_string):
"""Parse a 'standard' Asterisk version"""
@@ -52,13 +54,15 @@
count += 1
return (parsed_numbers, True)
+
def parse_revision(revision_string):
"""Parse a modified version of Asterisk"""
candidate = revision_string.replace('M', '')
- candidate = candidate.replace('r','').replace('m','')
+ candidate = candidate.replace('r', '').replace('m', '')
if candidate.isdigit():
return (int(candidate), True)
return (0, False)
+
def parse_feature(feature_string):
"""Parse a feature from a version"""
@@ -71,6 +75,7 @@
return (feature, iteration, True)
return ('', -1, False)
+
def parse_version_modifier(version_modifier):
"""Parse a version modifier"""
for modifier in AsteriskVersion.supported_modifiers:
@@ -82,11 +87,13 @@
return (modifier, iteration, True)
return ('', -1, False)
+
def parse_parent_branch(parent_branch):
"""Parse a parent branch out of a version branch"""
# Parent branch can be just about anything, so just accept it.
# This should be the last thing called.
return (parent_branch, True)
+
def parse_version_string(raw_version):
"""Parse a raw version string into its parts"""
@@ -136,7 +143,7 @@
if not handled and not parent:
(parent, handled) = parse_parent_branch(token)
if not handled:
- LOGGER.error("Unable to parse token '%s' in version " \
+ LOGGER.error("Unable to parse token '%s' in version "
"string '%s'" % (token, raw_version))
count += 1
return (parsed_numbers[0], parsed_numbers[1], parsed_numbers[2],
@@ -150,9 +157,9 @@
This class handles Asterisk version strings.
"""
- supported_features = [ 'cert', 'digiumphones', 'dfsg' ]
+ supported_features = ['cert', 'digiumphones', 'dfsg']
- supported_modifiers = [ 'rc', 'beta' ]
+ supported_modifiers = ['rc', 'beta']
def __init__(self, version=None):
"""Construct an Asterisk Version parser.
@@ -252,7 +259,7 @@
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=None)
+ stderr=None)
version = process.stdout.read()
except OSError as o_excep:
LOGGER.error("OSError [%d]: %s" % (o_excep.errno,
@@ -598,6 +605,7 @@
version2 = AsteriskVersion("Asterisk SVN-branch-1.8.11-cert-r368608")
self.assertTrue(version1 < version2)
+
def main():
"""Run the unit tests"""
unittest.main()
diff --git a/lib/python/asterisk/voicemail.py b/lib/python/asterisk/voicemail.py
index ce8668d..8ea9915 100644
--- a/lib/python/asterisk/voicemail.py
+++ b/lib/python/asterisk/voicemail.py
@@ -28,6 +28,7 @@
LOGGER = logging.getLogger(__name__)
+
class TestCondition(object):
"""Class that holds the state of some test condition.
@@ -60,13 +61,14 @@
Keyword Arguments:
value The value to evaluate
"""
- if self._evaluate_fn != None:
+ if self._evaluate_fn is not None:
self.current_state = self._evaluate_fn(value, self)
else:
- LOGGER.warn("No evaluate function defined, setting " \
+ LOGGER.warn("No evaluate function defined, setting "
"current_state to [%s]" % str(value))
self.current_state = value
return
+
def handle_redirect_failure(reason):
"""Generic AMI redirect failure handler"""
@@ -105,14 +107,14 @@
This should be called once ami_receiver and ami_sender have both been
set by the test derived from this class.
"""
- if (self.ami_receiver != None and self.ami_sender != None):
+ if (self.ami_receiver is not None and self.ami_sender is not None):
self.test_state_controller = TestStateController(self,
self.ami_receiver)
def hangup(self):
"""Hang up the current call"""
- if self.ast_sender == None:
+ if self.ast_sender is None:
msg = "Attempting to send hangup to non-existant Asterisk instance"
LOGGER.error(msg)
failure = FailureTestState(self.condition_controller)
@@ -132,7 +134,7 @@
dtmf_to_send The DTMF code to send
"""
LOGGER.info("Attempting to send DTMF " + dtmf_to_send)
- if self.ami_sender == None:
+ if self.ami_sender is None:
LOGGER.error("Attempting to send DTMF to non-connected caller AMI")
failure = FailureTestState(self.condition_controller)
self.test_state_controller.change_state(failure)
@@ -158,7 +160,7 @@
audio_file The local path to the file to stream
"""
- if self.ami_sender == None:
+ if self.ami_sender is None:
msg = "Attempting to send sound file to non-connected caller AMI"
LOGGER.error(msg)
failure = FailureTestState(self.condition_controller)
@@ -189,7 +191,7 @@
close the audio recording cleanly; otherwise, Asterisk will detect the
end of file as a hangup
"""
- if self.ami_sender == None:
+ if self.ami_sender is None:
msg = "Attempting to send sound/DTMF to non-connected caller AMI"
LOGGER.error(msg)
failure = FailureTestState(self.condition_controller)
@@ -255,9 +257,10 @@
ret_val = True
for key, value in self._test_conditions.items():
if not value.current_state:
- LOGGER.warn("Test Condition [" + key + "] has not passed")
+ LOGGER.warn("Test Condition [" + key + "] has not passed")
ret_val = False
return ret_val
+
class VoiceMailState(TestState):
"""Base class for VoiceMail TestEvent state machine handling
@@ -275,7 +278,7 @@
"""
super(VoiceMailState, self).__init__(controller)
self.voice_mail_test = voice_mail_test
- if self.voice_mail_test == None:
+ if self.voice_mail_test is None:
msg = "Failed to set voicemail test object"
LOGGER.error(msg)
raise RuntimeError(msg)
@@ -385,7 +388,7 @@
except IOError as io_error:
if io_error.errno == errno.EACCESS:
- LOGGER.error("You do not have sufficient permissions to " \
+ LOGGER.error("You do not have sufficient permissions to "
"perform the necessary directory manipulations")
return False
@@ -588,7 +591,6 @@
return True
return False
-
class UserObject(object):
"""An object that holds voicemail user information"""
@@ -734,4 +736,3 @@
os.rmdir(mailbox_path)
return True
-
diff --git a/lib/python/pcap_listener.py b/lib/python/pcap_listener.py
index afc917a..4c27f91 100644
--- a/lib/python/pcap_listener.py
+++ b/lib/python/pcap_listener.py
@@ -1,6 +1,7 @@
from twisted.internet import abstract, protocol
from yappcap import PcapLive, findalldevs, PcapTimeout
+
class PcapFile(abstract.FileDescriptor):
"""Treat a live pcap capture as a file for Twisted to call select() on"""
def __init__(self, protocol, interface, xfilter=None, dumpfile=None,
@@ -8,7 +9,7 @@
abstract.FileDescriptor.__init__(self)
p = PcapLive(interface, autosave=dumpfile, snaplen=snaplen,
- buffer_size=buffer_size)
+ buffer_size=buffer_size)
p.activate()
p.blocking = False
@@ -63,7 +64,7 @@
if snaplen is None:
snaplen = 65535
self.pf = PcapFile(self, interface, bpf_filter, dumpfile, snaplen,
- buffer_size)
+ buffer_size)
self.callback = callback
def makeConnection(self, transport):
diff --git a/lib/python/qm.py b/lib/python/qm.py
index f4d2e2e..bf63a41 100644
--- a/lib/python/qm.py
+++ b/lib/python/qm.py
@@ -38,6 +38,7 @@
'((NOT B) OR (NOT A))'
"""
+
class QM:
def __init__(self, variables):
"""
@@ -66,9 +67,9 @@
# Handle special case for functions that always evaluate to True or False.
if len(ones) == 0:
- return 0,'0'
- if len(ones) + len(dc) == 1<<self.numvars:
- return 0,'1'
+ return 0, '0'
+ if len(ones) + len(dc) == 1 << self.numvars:
+ return 0, '1'
primes = self.compute_primes(ones + dc)
return self.unate_cover(list(primes), ones)
@@ -82,10 +83,10 @@
"""
sigma = []
- for i in xrange(self.numvars+1):
+ for i in xrange(self.numvars + 1):
sigma.append(set())
for i in cubes:
- sigma[bitcount(i)].add((i,0))
+ sigma[bitcount(i)].add((i, 0))
primes = set()
while sigma:
@@ -96,7 +97,7 @@
for a in c1:
for b in c2:
m = merge(a, b)
- if m != None:
+ if m is not None:
nc.add(m)
redundant |= set([a, b])
nsigma.append(nc)
@@ -128,14 +129,14 @@
covers = []
if len(chart) > 0:
covers = [set([i]) for i in chart[0]]
- for i in xrange(1,len(chart)):
+ for i in xrange(1, len(chart)):
new_covers = []
for cover in covers:
for prime_index in chart[i]:
x = set(cover)
x.add(prime_index)
append = True
- for j in xrange(len(new_covers)-1,-1,-1):
+ for j in xrange(len(new_covers)-1, -1, -1):
if x <= new_covers[j]:
del new_covers[j]
elif x > new_covers[j]:
@@ -152,7 +153,7 @@
min_complexity = complexity
result = primes_in_cover
- return min_complexity,result
+ return min_complexity, result
def calculate_complexity(self, minterms):
"""
@@ -200,7 +201,7 @@
complexity = len(minterms)
if complexity == 1:
complexity = 0
- mask = (1<<self.numvars)-1
+ mask = (1 << self.numvars) - 1
for minterm in minterms:
masked = ~minterm[1] & mask
term_complexity = bitcount(masked)
@@ -221,12 +222,12 @@
NOT.
"""
- if isinstance(minterms,str):
+ if isinstance(minterms, str):
return minterms
def parentheses(glue, array):
if len(array) > 1:
- return ''.join(['(',glue.join(array),')'])
+ return ''.join(['(', glue.join(array), ')'])
else:
return glue.join(array)
@@ -234,21 +235,23 @@
for minterm in minterms:
and_terms = []
for j in xrange(len(self.variables)):
- if minterm[0] & 1<<j:
+ if minterm[0] & 1 << j:
and_terms.append(self.variables[j])
- elif not minterm[1] & 1<<j:
+ elif not minterm[1] & 1 << j:
and_terms.append('(NOT %s)' % self.variables[j])
or_terms.append(parentheses(' AND ', and_terms))
return parentheses(' OR ', or_terms)
+
def bitcount(i):
""" Count set bits of the input. """
res = 0
while i > 0:
- res += i&1
- i>>=1
+ res += i & 1
+ i >>= 1
return res
+
def is_power_of_two_or_zero(x):
"""
@@ -258,6 +261,7 @@
return (x & (~x + 1)) == x
+
def merge(i, j):
""" Combine two minterms. """
@@ -266,5 +270,4 @@
y = i[0] ^ j[0]
if not is_power_of_two_or_zero(y):
return None
- return (i[0] & j[0],i[1]|y)
-
+ return (i[0] & j[0], i[1] | y)
diff --git a/lib/python/rlmi.py b/lib/python/rlmi.py
index 0f27050..3165be2 100644
--- a/lib/python/rlmi.py
+++ b/lib/python/rlmi.py
@@ -29,7 +29,8 @@
Namespace = pyxb.namespace.NamespaceForURI(u'urn:ietf:params:xml:ns:rlmi', create_if_missing=True)
Namespace.configureCategories(['typeBinding', 'elementBinding'])
-def CreateFromDocument (xml_text, default_namespace=None, location_base=None):
+
+def CreateFromDocument(xml_text, default_namespace=None, location_base=None):
"""Parse the given XML and use the document element to create a
Python instance.
@@ -62,7 +63,8 @@
instance = handler.rootObject()
return instance
-def CreateFromDOM (node, default_namespace=None):
+
+def CreateFromDOM(node, default_namespace=None):
"""Create a Python instance from the given DOM node.
The node tag must correspond to an element declaration in this module.
@@ -73,7 +75,7 @@
# Atomic simple type: [anonymous]
-class STD_ANON (pyxb.binding.datatypes.string, pyxb.binding.basis.enumeration_mixin):
+class STD_ANON(pyxb.binding.datatypes.string, pyxb.binding.basis.enumeration_mixin):
"""An atomic simple type."""
@@ -86,8 +88,9 @@
STD_ANON.terminated = STD_ANON._CF_enumeration.addEnumeration(unicode_value=u'terminated', tag=u'terminated')
STD_ANON._InitializeFacetMap(STD_ANON._CF_enumeration)
+
# Complex type [anonymous] with content type ELEMENT_ONLY
-class CTD_ANON (pyxb.binding.basis.complexTypeDefinition):
+class CTD_ANON(pyxb.binding.basis.complexTypeDefinition):
"""Complex type [anonymous] with content type ELEMENT_ONLY"""
_TypeDefinition = None
_ContentTypeTag = pyxb.binding.basis.complexTypeDefinition._CT_ELEMENT_ONLY
@@ -97,68 +100,60 @@
_ElementMap = {}
_AttributeMap = {}
# Base type is pyxb.binding.datatypes.anyType
-
+
# Element {urn:ietf:params:xml:ns:rlmi}resource uses Python identifier resource
__resource = pyxb.binding.content.ElementDeclaration(pyxb.namespace.ExpandedName(Namespace, u'resource'), 'resource', '__urnietfparamsxmlnsrlmi_CTD_ANON_urnietfparamsxmlnsrlmiresource', True, pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 24, 2), )
-
resource = property(__resource.value, __resource.set, None, None)
-
# Element {urn:ietf:params:xml:ns:rlmi}name uses Python identifier name
__name = pyxb.binding.content.ElementDeclaration(pyxb.namespace.ExpandedName(Namespace, u'name'), 'name', '__urnietfparamsxmlnsrlmi_CTD_ANON_urnietfparamsxmlnsrlminame', True, pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 58, 2), )
-
name = property(__name.value, __name.set, None, None)
-
# Attribute uri uses Python identifier uri
__uri = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'uri'), 'uri', '__urnietfparamsxmlnsrlmi_CTD_ANON_uri', pyxb.binding.datatypes.anyURI, required=True)
__uri._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 15, 6)
__uri._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 15, 6)
-
+
uri = property(__uri.value, __uri.set, None, None)
-
# Attribute version uses Python identifier version
__version = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'version'), 'version', '__urnietfparamsxmlnsrlmi_CTD_ANON_version', pyxb.binding.datatypes.unsignedInt, required=True)
__version._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 16, 6)
__version._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 16, 6)
-
+
version = property(__version.value, __version.set, None, None)
-
# Attribute fullState uses Python identifier fullState
__fullState = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'fullState'), 'fullState', '__urnietfparamsxmlnsrlmi_CTD_ANON_fullState', pyxb.binding.datatypes.boolean, required=True)
__fullState._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 18, 6)
__fullState._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 18, 6)
-
+
fullState = property(__fullState.value, __fullState.set, None, None)
-
# Attribute cid uses Python identifier cid
__cid = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'cid'), 'cid', '__urnietfparamsxmlnsrlmi_CTD_ANON_cid', pyxb.binding.datatypes.string)
__cid._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 20, 6)
__cid._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 20, 6)
-
+
cid = property(__cid.value, __cid.set, None, None)
_AttributeWildcard = pyxb.binding.content.Wildcard(process_contents=pyxb.binding.content.Wildcard.PC_lax, namespace_constraint=pyxb.binding.content.Wildcard.NC_any)
_ElementMap.update({
- __resource.name() : __resource,
- __name.name() : __name
+ __resource.name(): __resource,
+ __name.name(): __name
})
_AttributeMap.update({
- __uri.name() : __uri,
- __version.name() : __version,
- __fullState.name() : __fullState,
- __cid.name() : __cid
+ __uri.name(): __uri,
+ __version.name(): __version,
+ __fullState.name(): __fullState,
+ __cid.name(): __cid
})
-
# Complex type [anonymous] with content type ELEMENT_ONLY
-class CTD_ANON_ (pyxb.binding.basis.complexTypeDefinition):
+class CTD_ANON_(pyxb.binding.basis.complexTypeDefinition):
"""Complex type [anonymous] with content type ELEMENT_ONLY"""
_TypeDefinition = None
_ContentTypeTag = pyxb.binding.basis.complexTypeDefinition._CT_ELEMENT_ONLY
@@ -168,41 +163,36 @@
_ElementMap = {}
_AttributeMap = {}
# Base type is pyxb.binding.datatypes.anyType
-
+
# Element {urn:ietf:params:xml:ns:rlmi}instance uses Python identifier instance
__instance = pyxb.binding.content.ElementDeclaration(pyxb.namespace.ExpandedName(Namespace, u'instance'), 'instance', '__urnietfparamsxmlnsrlmi_CTD_ANON__urnietfparamsxmlnsrlmiinstance', True, pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 36, 2), )
-
instance = property(__instance.value, __instance.set, None, None)
-
# Element {urn:ietf:params:xml:ns:rlmi}name uses Python identifier name
__name = pyxb.binding.content.ElementDeclaration(pyxb.namespace.ExpandedName(Namespace, u'name'), 'name', '__urnietfparamsxmlnsrlmi_CTD_ANON__urnietfparamsxmlnsrlminame', True, pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 58, 2), )
-
name = property(__name.value, __name.set, None, None)
-
# Attribute uri uses Python identifier uri
__uri = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'uri'), 'uri', '__urnietfparamsxmlnsrlmi_CTD_ANON__uri', pyxb.binding.datatypes.anyURI, required=True)
__uri._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 32, 6)
__uri._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 32, 6)
-
+
uri = property(__uri.value, __uri.set, None, None)
_AttributeWildcard = pyxb.binding.content.Wildcard(process_contents=pyxb.binding.content.Wildcard.PC_lax, namespace_constraint=pyxb.binding.content.Wildcard.NC_any)
_ElementMap.update({
- __instance.name() : __instance,
- __name.name() : __name
+ __instance.name(): __instance,
+ __name.name(): __name
})
_AttributeMap.update({
- __uri.name() : __uri
+ __uri.name(): __uri
})
-
# Complex type [anonymous] with content type SIMPLE
-class CTD_ANON_2 (pyxb.binding.basis.complexTypeDefinition):
+class CTD_ANON_2(pyxb.binding.basis.complexTypeDefinition):
"""Complex type [anonymous] with content type SIMPLE"""
_TypeDefinition = pyxb.binding.datatypes.string
_ContentTypeTag = pyxb.binding.basis.complexTypeDefinition._CT_SIMPLE
@@ -212,25 +202,23 @@
_ElementMap = {}
_AttributeMap = {}
# Base type is pyxb.binding.datatypes.string
-
+
# Attribute {http://www.w3.org/XML/1998/namespace}lang uses Python identifier lang
__lang = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(pyxb.namespace.XML, 'lang'), 'lang', '__urnietfparamsxmlnsrlmi_CTD_ANON_2_httpwww_w3_orgXML1998namespacelang', pyxb.binding.xml_.STD_ANON_lang)
__lang._DeclarationLocation = None
__lang._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 62, 10)
-
+
lang = property(__lang.value, __lang.set, None, None)
_ElementMap.update({
-
})
_AttributeMap.update({
- __lang.name() : __lang
+ __lang.name(): __lang
})
-
# Complex type [anonymous] with content type ELEMENT_ONLY
-class CTD_ANON_3 (pyxb.binding.basis.complexTypeDefinition):
+class CTD_ANON_3(pyxb.binding.basis.complexTypeDefinition):
"""Complex type [anonymous] with content type ELEMENT_ONLY"""
_TypeDefinition = None
_ContentTypeTag = pyxb.binding.basis.complexTypeDefinition._CT_ELEMENT_ONLY
@@ -240,50 +228,45 @@
_ElementMap = {}
_AttributeMap = {}
# Base type is pyxb.binding.datatypes.anyType
-
+
# Attribute id uses Python identifier id
__id = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'id'), 'id', '__urnietfparamsxmlnsrlmi_CTD_ANON_3_id', pyxb.binding.datatypes.string, required=True)
__id._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 42, 6)
__id._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 42, 6)
-
+
id = property(__id.value, __id.set, None, None)
-
# Attribute state uses Python identifier state
__state = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'state'), 'state', '__urnietfparamsxmlnsrlmi_CTD_ANON_3_state', STD_ANON, required=True)
__state._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 43, 6)
__state._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 43, 6)
-
+
state = property(__state.value, __state.set, None, None)
-
# Attribute reason uses Python identifier reason
__reason = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'reason'), 'reason', '__urnietfparamsxmlnsrlmi_CTD_ANON_3_reason', pyxb.binding.datatypes.string)
__reason._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 52, 6)
__reason._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 52, 6)
-
+
reason = property(__reason.value, __reason.set, None, None)
-
# Attribute cid uses Python identifier cid
__cid = pyxb.binding.content.AttributeUse(pyxb.namespace.ExpandedName(None, u'cid'), 'cid', '__urnietfparamsxmlnsrlmi_CTD_ANON_3_cid', pyxb.binding.datatypes.string)
__cid._DeclarationLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 54, 6)
__cid._UseLocation = pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 54, 6)
-
+
cid = property(__cid.value, __cid.set, None, None)
_AttributeWildcard = pyxb.binding.content.Wildcard(process_contents=pyxb.binding.content.Wildcard.PC_lax, namespace_constraint=pyxb.binding.content.Wildcard.NC_any)
_HasWildcardElement = True
_ElementMap.update({
-
})
_AttributeMap.update({
- __id.name() : __id,
- __state.name() : __state,
- __reason.name() : __reason,
- __cid.name() : __cid
+ __id.name(): __id,
+ __state.name(): __state,
+ __reason.name(): __reason,
+ __cid.name(): __cid
})
-
list = pyxb.binding.basis.element(pyxb.namespace.ExpandedName(Namespace, u'list'), CTD_ANON, location=pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 7, 2))
@@ -298,13 +281,12 @@
instance = pyxb.binding.basis.element(pyxb.namespace.ExpandedName(Namespace, u'instance'), CTD_ANON_3, location=pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 36, 2))
Namespace.addCategoryObject('elementBinding', instance.name().localName(), instance)
-
-
CTD_ANON._AddElement(pyxb.binding.basis.element(pyxb.namespace.ExpandedName(Namespace, u'resource'), CTD_ANON_, scope=CTD_ANON, location=pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 24, 2)))
CTD_ANON._AddElement(pyxb.binding.basis.element(pyxb.namespace.ExpandedName(Namespace, u'name'), CTD_ANON_2, scope=CTD_ANON, location=pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 58, 2)))
-def _BuildAutomaton ():
+
+def _BuildAutomaton():
# Remove this helper function from the namespace after it is invoked
global _BuildAutomaton
del _BuildAutomaton
@@ -327,26 +309,21 @@
st_1 = fac.State(symbol, is_initial=True, final_update=final_update, is_unordered_catenation=False)
states.append(st_1)
transitions = []
- transitions.append(fac.Transition(st_0, [
- fac.UpdateInstruction(cc_0, True) ]))
- transitions.append(fac.Transition(st_1, [
- fac.UpdateInstruction(cc_0, False) ]))
+ transitions.append(fac.Transition(st_0, [fac.UpdateInstruction(cc_0, True)]))
+ transitions.append(fac.Transition(st_1, [fac.UpdateInstruction(cc_0, False)]))
st_0._set_transitionSet(transitions)
transitions = []
- transitions.append(fac.Transition(st_1, [
- fac.UpdateInstruction(cc_1, True) ]))
+ transitions.append(fac.Transition(st_1, [fac.UpdateInstruction(cc_1, True)]))
st_1._set_transitionSet(transitions)
return fac.Automaton(states, counters, True, containing_state=None)
CTD_ANON._Automaton = _BuildAutomaton()
-
-
-
CTD_ANON_._AddElement(pyxb.binding.basis.element(pyxb.namespace.ExpandedName(Namespace, u'instance'), CTD_ANON_3, scope=CTD_ANON_, location=pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 36, 2)))
CTD_ANON_._AddElement(pyxb.binding.basis.element(pyxb.namespace.ExpandedName(Namespace, u'name'), CTD_ANON_2, scope=CTD_ANON_, location=pyxb.utils.utility.Location('/usr/src/asterisk/trunk/shit.xsd', 58, 2)))
-def _BuildAutomaton_ ():
+
+def _BuildAutomaton_():
# Remove this helper function from the namespace after it is invoked
global _BuildAutomaton_
del _BuildAutomaton_
@@ -369,22 +346,17 @@
st_1 = fac.State(symbol, is_initial=True, final_update=final_update, is_unordered_catenation=False)
states.append(st_1)
transitions = []
- transitions.append(fac.Transition(st_0, [
- fac.UpdateInstruction(cc_0, True) ]))
- transitions.append(fac.Transition(st_1, [
- fac.UpdateInstruction(cc_0, False) ]))
+ transitions.append(fac.Transition(st_0, [fac.UpdateInstruction(cc_0, True)]))
+ transitions.append(fac.Transition(st_1, [fac.UpdateInstruction(cc_0, False)]))
st_0._set_transitionSet(transitions)
transitions = []
- transitions.append(fac.Transition(st_1, [
- fac.UpdateInstruction(cc_1, True) ]))
+ transitions.append(fac.Transition(st_1, [fac.UpdateInstruction(cc_1, True)]))
st_1._set_transitionSet(transitions)
return fac.Automaton(states, counters, True, containing_state=None)
CTD_ANON_._Automaton = _BuildAutomaton_()
-
-
-def _BuildAutomaton_2 ():
+def _BuildAutomaton_2():
# Remove this helper function from the namespace after it is invoked
global _BuildAutomaton_2
del _BuildAutomaton_2
@@ -400,9 +372,7 @@
st_0 = fac.State(symbol, is_initial=True, final_update=final_update, is_unordered_catenation=False)
states.append(st_0)
transitions = []
- transitions.append(fac.Transition(st_0, [
- fac.UpdateInstruction(cc_0, True) ]))
+ transitions.append(fac.Transition(st_0, [fac.UpdateInstruction(cc_0, True)]))
st_0._set_transitionSet(transitions)
return fac.Automaton(states, counters, True, containing_state=None)
CTD_ANON_3._Automaton = _BuildAutomaton_2()
-
diff --git a/lib/python/sip_message.py b/lib/python/sip_message.py
index 3dacc14..85cd9fd 100644
--- a/lib/python/sip_message.py
+++ b/lib/python/sip_message.py
@@ -1,8 +1,10 @@
#!/usr/bin/env python
import re
+
class SIPParseError(Exception):
pass
+
# This is not particularly efficient. I don't care.
# Ok, I do, but I'm not going to do anything about it.
@@ -25,7 +27,8 @@
# Now, seperate Request/Response line from the rest of the data
(self.first_line, rest) = data.split("\r\n", 1)
- # Now convert any multi-line headers to a single line, and then split on newlines
+ # Now convert any multi-line headers to a single line, and then
+ # split on newlines
header_array = re.sub(r'\r\n[ \t]+', ' ', rest).split("\r\n")
# Now convert the headers into an array of (field, val) tuples
@@ -49,7 +52,12 @@
return res
def __str__(self):
- return "%s\r\n%s\r\n\r\n%s" % (self.first_line, "\r\n".join(["%s: %s" % (h[0].title(), h[1]) for h in self.headers]), self.body)
+ return "%s\r\n%s\r\n\r\n%s" % (
+ self.first_line,
+ "\r\n".join([
+ "%s: %s" % (h[0].title(), h[1])
+ for h in self.headers]),
+ self.body)
class SIPMessageTest(object):
@@ -92,7 +100,6 @@
it = sipmsg.get_header_all(key)
return self._match_val(regex, it)
-
def test_sip_msg(self, sipmsg):
if len(self.matches_left) > 0:
diff --git a/runtests.py b/runtests.py
index c4d2cda..971de7c 100755
--- a/runtests.py
+++ b/runtests.py
@@ -129,14 +129,18 @@
try:
(run_num, run_dir, archive_dir) = self._find_run_dirs()
symlink_dir = os.path.dirname(run_dir)
- absolute_dir = os.path.join(os.path.dirname(symlink_dir), os.readlink(symlink_dir))
+ absolute_dir = os.path.join(os.path.dirname(symlink_dir),
+ os.readlink(symlink_dir))
shutil.rmtree(absolute_dir)
os.remove(symlink_dir)
except:
- print "Unable to clean up directory for test %s (non-fatal)" % self.test_name
+ print "Unable to clean up directory for" \
+ "test %s (non-fatal)" % self.test_name
self.__parse_run_output(self.stdout)
- print 'Test %s %s\n' % (cmd, 'timedout' if timedout else 'passed' if self.passed else 'failed')
+ print 'Test %s %s\n' % (
+ cmd,
+ 'timedout' if timedout else 'passed' if self.passed else 'failed')
else:
print "FAILED TO EXECUTE %s, it must exist and be executable" % cmd
@@ -160,7 +164,12 @@
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
dest_file = open(dest_dir + "/backtrace_%s.txt" % str(random_num), "w")
- gdb_cmd = ["gdb", "-se", "asterisk", "-ex", "bt full", "-ex", "thread apply all bt", "--batch", "-c", core]
+ gdb_cmd = ["gdb",
+ "-se", "asterisk",
+ "-ex", "bt full",
+ "-ex", "thread apply all bt",
+ "--batch",
+ "-c", core]
print "Running %s" % (" ".join(gdb_cmd),)
try:
res = subprocess.call(gdb_cmd, stdout=dest_file, stderr=subprocess.STDOUT)
@@ -168,7 +177,8 @@
print "error analyzing core dump; gdb exited with %d" % res
# Copy the backtrace over to the logs
except OSError, ose:
- print "OSError ([%d]: %s) occurred while executing %r" % (ose.errno, ose.strerror, gdb_cmd)
+ print "OSError ([%d]: %s) occurred while executing %r" % \
+ (ose.errno, ose.strerror, gdb_cmd)
return
except:
print "Unknown exception occurred while executing %r" % (gdb_cmd,)
@@ -178,7 +188,8 @@
try:
os.unlink(core)
except OSError, e:
- print "Error removing core file: %s: Beware of the stale core file in CWD!" % (e,)
+ print "Error removing core file: %s: " \
+ "Beware of the stale core file in CWD!" % (e,)
def _find_run_dirs(self):
test_run_dir = os.path.join(Asterisk.test_suite_root,
@@ -236,7 +247,8 @@
if (run_num == 0):
return
- refcounter_py = os.path.join(run_dir, "ast1/var/lib/asterisk/scripts/refcounter.py")
+ refcounter_py = os.path.join(run_dir,
+ "ast1/var/lib/asterisk/scripts/refcounter.py")
if not os.path.exists(refcounter_py):
return
@@ -267,9 +279,9 @@
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
hardlink_or_copy(refs_txt,
- os.path.join(dest_dir, "refs.txt"))
+ os.path.join(dest_dir, "refs.txt"))
hardlink_or_copy(refs_in,
- os.path.join(dest_dir, "refs"))
+ os.path.join(dest_dir, "refs"))
self.stdout_print("REF_DEBUG identified leaks, mark test as failure")
self.passed = False
i += 1
@@ -299,8 +311,8 @@
dest_dir = os.path.join(archive_dir,
'ast%d/var/log/asterisk' % i)
self._archive_files(ast_dir, dest_dir,
- 'messages.txt', 'full.txt', 'mmlog',
- 'valgrind.xml', 'valgrind-summary.txt')
+ 'messages.txt', 'full.txt', 'mmlog',
+ 'valgrind.xml', 'valgrind-summary.txt')
i += 1
def _archive_pcap_dump(self, run_dir, archive_dir):
@@ -322,7 +334,8 @@
self.tests = []
self.global_config = self._parse_global_config()
- self.tests = sorted(self._parse_test_yaml("tests", ast_version), key=lambda test: test.test_name)
+ self.tests = sorted(self._parse_test_yaml("tests", ast_version),
+ key=lambda test: test.test_name)
self.total_time = 0.0
self.total_count = 0
self.total_failures = 0
@@ -349,13 +362,15 @@
for val in t:
path = "%s/%s" % (test_dir, t[val])
if val == "test":
- # If we specified a subset of tests, there's no point loading the others.
+ # If we specified a subset of tests, there's no point loading
+ # the others.
if (self.options.tests and
not any((path + '/').startswith(test)
for test in self.options.tests)):
continue
- tests.append(TestRun(path, ast_version, self.options, self.global_config, self.options.timeout))
+ tests.append(TestRun(path, ast_version, self.options,
+ self.global_config, self.options.timeout))
elif val == "dir":
tests += self._parse_test_yaml(path, ast_version)
@@ -404,11 +419,9 @@
for d in t.test_config.deps:
if d.version:
print " --> Dependency: %s" % (d.name)
- print " --> Version: %s -- Met: %s" % (d.version,
- str(d.met))
+ print " --> Version: %s -- Met: %s" % (d.version, str(d.met))
else:
- print " --> Dependency: %s -- Met: %s" % (d.name,
- str(d.met))
+ print " --> Dependency: %s -- Met: %s" % (d.name, str(d.met))
i += 1
@@ -418,12 +431,13 @@
for t in self.tests:
if t.can_run is False:
continue
- if self.global_config != None:
+ if self.global_config is not None:
for excluded in self.global_config.excluded_tests:
if excluded in t.test_name:
continue
i += 1
- print "Tests to run: %d, Maximum test inactivity time: %d sec." % (i, (self.options.timeout / 1000))
+ print "Tests to run: %d, Maximum test inactivity time: %d sec." % \
+ (i, (self.options.timeout / 1000))
for t in self.tests:
if t.can_run is False:
@@ -442,7 +456,8 @@
(", ".join([str(v) for v in t.test_config.maxversion]),
t.test_config.maxversion_check))
for f in t.test_config.features:
- print "--- --> Version Feature: %s - %s" % (f, str(t.test_config.feature_check[f]))
+ print "--- --> Version Feature: %s - %s" % (
+ f, str(t.test_config.feature_check[f]))
print "--- --> Tags: %s" % (t.test_config.tags)
for d in t.test_config.deps:
print "--- --> Dependency: %s - %s" % (d.name, str(d.met))
@@ -464,12 +479,14 @@
else:
# Establish Preconditions
print "Making sure Asterisk isn't running ..."
- if os.system("if pidof asterisk >/dev/null; then killall -9 asterisk >/dev/null 2>&1; "
- "sleep 1; ! pidof asterisk >/dev/null; fi"):
+ if os.system("if pidof asterisk >/dev/null; then "
+ "killall -9 asterisk >/dev/null 2>&1; "
+ "sleep 1; ! pidof asterisk >/dev/null; fi"):
print "Could not kill asterisk."
print "Making sure SIPp isn't running..."
- if os.system("if pidof sipp >/dev/null; then killall -9 sipp >/dev/null 2>&1; "
- "sleep 1; ! pidof sipp >/dev/null; fi"):
+ if os.system("if pidof sipp >/dev/null; then "
+ "killall -9 sipp >/dev/null 2>&1; "
+ "sleep 1; ! pidof sipp >/dev/null; fi"):
print "Could not kill sipp."
# XXX TODO Hard coded path, gross.
os.system("rm -f /var/run/asterisk/asterisk.ctl")
@@ -539,7 +556,8 @@
continue
failure = doc.createElement("failure")
- failure.appendChild(doc.createTextNode(self.__strip_illegal_xml_chars(t.failure_message)))
+ failure.appendChild(doc.createTextNode(
+ self.__strip_illegal_xml_chars(t.failure_message)))
tc.appendChild(failure)
doc.writexml(f, addindent=" ", newl="\n", encoding="utf-8")
@@ -557,33 +575,33 @@
parser = optparse.OptionParser(usage=usage)
parser.add_option("-l", "--list-tests", action="store_true",
- dest="list_tests", default=False,
- help="List tests instead of running them.")
+ dest="list_tests", default=False,
+ help="List tests instead of running them.")
parser.add_option("-t", "--test", action="append", default=[],
- dest="tests",
- help=("Run a single specified test (directory) instead of all tests. "
- "May be specified more than once."))
+ dest="tests",
+ help=("Run a single specified test (directory) instead "
+ "of all tests. May be specified more than once."))
parser.add_option("-g", "--tag", action="append",
- dest="tags",
- help="Specify one or more tags to select a subset of tests.")
+ dest="tags",
+ help="Specify one or more tags to select a subset of tests.")
parser.add_option("-v", "--version",
- dest="version", default=None,
- help="Specify the version of Asterisk rather then detecting it.")
+ dest="version", default=None,
+ help="Specify the version of Asterisk rather then detecting it.")
parser.add_option("-L", "--list-tags", action="store_true",
- dest="list_tags", default=False,
- help="List available tags")
+ dest="list_tags", default=False,
+ help="List available tags")
parser.add_option("-n", "--dry-run", action="store_true",
- dest="dry_run", default=False,
- help="Only show which tests would be run.")
+ dest="dry_run", default=False,
+ help="Only show which tests would be run.")
parser.add_option("--timeout", metavar='int', type=int,
- dest="timeout", default=-1,
- help="Abort test after n seconds of no output.")
+ dest="timeout", default=-1,
+ help="Abort test after n seconds of no output.")
parser.add_option("-V", "--valgrind", action="store_true",
- dest="valgrind", default=False,
- help="Run Asterisk under Valgrind")
+ dest="valgrind", default=False,
+ help="Run Asterisk under Valgrind")
parser.add_option("-c", "--cleanup", action="store_true",
- dest="cleanup", default=False,
- help="Cleanup tmp directory after each successful test")
+ dest="cleanup", default=False,
+ help="Cleanup tmp directory after each successful test")
(options, args) = parser.parse_args(argv)
ast_version = AsteriskVersion(options.version)
@@ -609,7 +627,8 @@
if options.valgrind:
if not ET:
- print "python lxml module not loaded, text summaries from valgrind will not be produced.\n"
+ print "python lxml module not loaded, text summaries " \
+ "from valgrind will not be produced.\n"
os.environ["VALGRIND_ENABLE"] = "true"
print "Running tests for Asterisk %s ...\n" % str(ast_version)
@@ -627,8 +646,7 @@
if t.did_run is False:
print "SKIPPED"
for d in t.test_config.deps:
- print " --> Dependency: %s -- Met: %s" % (d.name,
- str(d.met))
+ print " --> Dependency: %s -- Met: %s" % (d.name, str(d.met))
if options.tags:
for t in t.test_config.tags:
print " --> Tag: %s -- Met: %s" % (t, str(t in options.tags))
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..e5cc8ca
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,2 @@
+[pep8]
+max-line-length=90
--
To view, visit https://gerrit.asterisk.org/40
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I55bcaab21c54f9040594f51c57f0efe30a219a62
Gerrit-PatchSet: 1
Gerrit-Project: testsuite
Gerrit-Branch: master
Gerrit-Owner: Corey Farrell <git at cfware.com>
More information about the asterisk-dev
mailing list