[svn-commits] dlee: branch dlee/ari-tests r3843 - in /asterisk/team/dlee/ari-tests: lib/pyt...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Tue Jun 11 17:00:46 CDT 2013
Author: dlee
Date: Tue Jun 11 17:00:45 2013
New Revision: 3843
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3843
Log:
Clean up; PEP-8
Modified:
asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py
asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py
Modified: asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py?view=diff&rev=3843&r1=3842&r2=3843
==============================================================================
--- asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py (original)
+++ asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py Tue Jun 11 17:00:45 2013
@@ -21,32 +21,187 @@
logger = logging.getLogger(__name__)
-class Range(object):
- def __init__(self, min = 0, max = float("inf")):
- self.min = min
- self.max = max
-
- def contains(self, v):
- return self.min <= v <= self.max
-
-def decode_range(v):
- if v is None:
- # Unspecified; recieve at least one
- return Range(1, float("inf"))
- elif isinstance(v, int):
- # Need exactly this many events
- return Range(v, v)
- elif v[0] == '<':
- # Need at most this many events
- return Range(0, int(v[1:]))
- elif v[0] == '>':
- # Need at least this many events
- return Range(int(v[1:]), float("inf"))
- else:
- # Need exactly this many events
- return Range(int(v), int(v))
+DEFAULT_PORT = 8088
+
+
+class WebSocketEventModule(object):
+ '''Module for capturing events from the ARI WebSocket
+ '''
+
+ def __init__(self, module_config, test_object):
+ '''Constructor.
+
+ :param module_config: Configuration dict parse from test-config.yaml.
+ :param test_object: Test control object.
+ '''
+ logger.info("WebSocketEventModule ctor")
+ self.host = '127.0.0.1'
+ self.port = DEFAULT_PORT
+ self.test_object = test_object
+ #: ARI interface object
+ self.ari = ARI(self.host, self.port)
+ #: Matchers for incoming events
+ self.event_matchers = [
+ EventMatcher(self.ari, e, test_object)
+ for e in module_config['events']]
+ apps = module_config['apps']
+ if isinstance(apps, list):
+ apps = ','.join(apps)
+ #: Twisted protocol factory for ARI WebSockets
+ self.factory = AriClientFactory(host=self.host, port=self.port,
+ apps=apps, on_event=self.on_event)
+
+ def on_event(self, event):
+ '''Handle incoming events from the WebSocket.
+
+ :param event: Dictionary parsed from incoming JSON event.
+ '''
+ for matcher in self.event_matchers:
+ matcher.on_event(event)
+
+
+class AriClientFactory(WebSocketClientFactory):
+ '''Twisted protocol factory for building ARI WebSocket clients.
+ '''
+ def __init__(self, host, apps, on_event, port=DEFAULT_PORT,
+ timeout_secs=15):
+ '''Constructor
+
+ :param host: Hostname of Asterisk.
+ :param apps: App names to subscribe to.
+ :param on_event: Callback to invoke for all received events.
+ :param port: Port of Asterisk web server.
+ :param timeout_secs: Maximum time to try to connect to Asterisk.
+ '''
+ url = "ws://%s:%d/ws?%s" % \
+ (host, port, urllib.urlencode({'app': apps}))
+ WebSocketClientFactory.__init__(self, url, protocols=["stasis"])
+ self.on_event = on_event
+ self.timeout_secs = timeout_secs
+ self.protocol = self.__build_protocol
+ self.attempts = 0
+ self.start = None
+
+ self.reconnect()
+
+ def __build_protocol(self):
+ '''Build a client protocol instance
+ '''
+ return AriClientProtocol(self.on_event)
+
+ def clientConnectionFailed(self, connector, reason):
+ '''Callback when client connection failed to connect.
+
+ :param connector: Twisted connector.
+ :param reason: Failure reason.
+ '''
+ logger.info("clientConnectionFailed(%s)" % (reason))
+ reactor.callLater(1, self.reconnect)
+
+ def reconnect(self):
+ '''Attempt to reconnect the ARI WebSocket.
+
+ This call will give up after timeout_secs has been exceeded.
+ '''
+ self.attempts += 1
+ logger.debug("WebSocket attempt #%d" % self.attempts)
+ if not self.start:
+ self.start = datetime.datetime.now()
+ runtime = (datetime.datetime.now() - self.start).seconds
+ if runtime >= self.timeout_secs:
+ logger.error(" Giving up after %d seconds" % self.timeout_secs)
+ return
+
+ connectWS(self)
+
+
+class AriClientProtocol(WebSocketClientProtocol):
+ '''Twisted protocol for handling a ARI WebSocket connection.
+ '''
+ def __init__(self, on_event):
+ '''Constructor.
+
+ :param on_event: Callback to invoke with each parsed event.
+ '''
+ self.on_event = on_event
+
+ def onOpen(self):
+ '''Called back when connection is open.
+ '''
+ logger.debug("onOpen()")
+
+ def onClose(self, wasClean, code, reason):
+ '''Called back when connection is closed.
+ '''
+ logger.debug("onClose(%r, %d, %s)" % (wasClean, code, reason))
+ reactor.callLater(1, self.factory.reconnect)
+
+ def onMessage(self, msg, binary):
+ '''Called back when message is received.
+
+ :param msg: Received text message.
+ '''
+ self.on_event(json.loads(msg))
+
+
+class ARI(object):
+ '''Bare bones object for an ARI interface.
+ '''
+
+ def __init__(self, host, port=DEFAULT_PORT):
+ '''Constructor.
+
+ :param host: Hostname of Asterisk.
+ :param port: Port of the Asterisk webserver.
+ '''
+ self.base_url = "http://%s:%d/stasis" % (host, port)
+
+ def build_url(self, *args):
+ '''Build a URL from the given path.
+
+ For example::
+ # Builds the URL for /channels/{channel_id}/answer
+ ari.build_url('channels', channel_id, 'answer')
+
+ :param args: Path segments.
+ '''
+ path = [str(arg) for arg in args]
+ return '/'.join([self.base_url] + path)
+
+ def get(self, *args, **kwargs):
+ '''Send a GET request to ARI.
+
+ :param args: Path segements.
+ :param kwargs: Query parameters.
+ '''
+ url = self.build_url(*args, **kwargs)
+ logger.info("GET %s %r" % (url, kwargs))
+ return requests.get(url, params=kwargs)
+
+ def post(self, *args, **kwargs):
+ '''Send a POST request to ARI.
+
+ :param args: Path segements.
+ :param kwargs: Query parameters.
+ '''
+ url = self.build_url(*args, **kwargs)
+ logger.info("POST %s %r" % (url, kwargs))
+ return requests.post(url, params=kwargs)
+
+ def delete(self, *args, **kwargs):
+ '''Send a DELETE request to ARI.
+
+ :param args: Path segements.
+ :param kwargs: Query parameters.
+ '''
+ url = self.build_url(*args, **kwargs)
+ logger.info("DELETE %s %r" % (url, kwargs))
+ return requests.delete(url, params=kwargs)
+
class EventMatcher(object):
+ '''Object to observe incoming events and match them agains a configuration.
+ '''
def __init__(self, ari, instance_config, test_object):
self.ari = ari
self.instance_config = instance_config
@@ -66,6 +221,10 @@
test_object.register_stop_observer(self.on_stop)
def on_event(self, message):
+ '''Callback for every received ARI event.
+
+ :param message: Parsed event from ARI WebSocket.
+ '''
if self.matches(message):
self.count += 1
# Split call and accumulation to always call the callback
@@ -76,10 +235,15 @@
self.instance_config)
self.passed = self.passed and res
except:
- logger.error("Exception in callback: %s" % traceback.format_exc())
+ logger.error("Exception in callback: %s" %
+ traceback.format_exc())
self.passed = False
def on_stop(self, *args):
+ '''Callback for the end of the test.
+
+ :param args: Ignored arguments.
+ '''
if not self.count_range.contains(self.count):
logger.error("Expected %d <= count <= %d; was %d (%r)",
self.count_range.min, self.count_range.max,
@@ -88,7 +252,11 @@
self.test_object.set_passed(self.passed)
def matches(self, message):
- # Validate the match
+ '''Compares a message against the configured conditions.
+
+ :param message: Incoming ARI WebSocket event.
+ :returns: True if message matches conditions; False otherwise.
+ '''
match = self.conditions.get('match')
res = all_match(match, message)
@@ -98,118 +266,79 @@
res = not all_match(nomatch, message)
return res
+
def all_match(pattern, message):
+ '''Match a pattern from the YAML config with a received message.
+
+ :param pattern: Configured pattern.
+ :param message: Message to compare.
+ :returns: True if message matches pattern; False otherwise.
+ '''
#logger.debug("%r ?= %r" % (pattern, message))
if pattern is None:
+ # Empty pattern always matches
return True
elif isinstance(pattern, list):
+ # List must be an exact match
res = len(pattern) == len(message)
i = 0
while res and i < len(pattern):
res = all_match(pattern[i], message[i])
return res
elif isinstance(pattern, dict):
+ # Dict should match for every field in the pattern.
+ # extra fields in the message are fine.
for key, value in pattern.iteritems():
to_check = message.get(key)
if to_check is None or not all_match(value, to_check):
return False
return True
elif isinstance(pattern, str):
+ # Pattern strings are considered to be regexes
return re.match(pattern, message) is not None
elif isinstance(pattern, int):
+ # Integers are literal matches
return pattern == message
else:
logger.error("Unhandled pattern type %s" % type(pattern)).__name__
-class StasisClientProtocol(WebSocketClientProtocol):
- def __init__(self, on_event):
- self.on_event = on_event
-
- def onOpen(self):
- logger.debug("onOpen()")
-
- def onClose(self, wasClean, code, reason):
- logger.debug("onClose(%r, %d, %s)" % (wasClean, code, reason))
- reactor.callLater(1, self.factory.reconnect)
-
- def onMessage(self, msg, binary):
- self.on_event(json.loads(msg))
-
-
-class StasisClientFactory(WebSocketClientFactory):
- def __init__(self, host, port, apps, on_event, timeout_secs=15):
- url = "ws://%s:%d/ws?%s" % \
- (host, port, urllib.urlencode({'app': apps}))
- WebSocketClientFactory.__init__(self, url, protocols=["stasis"])
- self.on_event = on_event
- self.timeout_secs = timeout_secs
- self.protocol = self.__build_protocol
- self.attempts = 0
- self.start = None
-
- self.reconnect()
-
- def __build_protocol(self):
- return StasisClientProtocol(self.on_event)
-
- def clientConnectionFailed(self, connector, reason):
- logger.info("clientConnectionFailed(%s)" % (reason))
- reactor.callLater(1, self.reconnect)
-
- def reconnect(self):
- self.attempts += 1
- logger.debug("WebSocket attempt #%d" % self.attempts)
- if not self.start:
- self.start = datetime.datetime.now()
- runtime = (datetime.datetime.now() - self.start).seconds
- if runtime >= self.timeout_secs:
- logger.error(" Giving up after %d seconds" % self.timeout_secs)
- else:
- connectWS(self)
-
-
-class ARI(object):
- def __init__(self, host, port):
- self.base_url = "http://%s:%d/stasis" % (host, port)
-
- def build_url(self, *args, **kwargs):
- url = '/'.join([self.base_url] + list(args))
- params = urllib.urlencode(kwargs)
- return "%s?%s" % (url, params)
-
- def get(self, *args, **kwargs):
- url = self.build_url(*args, **kwargs)
- logger.info("GET %s" % url)
- return requests.get()
-
- def post(self, *args, **kwargs):
- url = self.build_url(*args, **kwargs)
- logger.info("POST %s" % url)
- return requests.post(url)
-
- def delete(self, *args, **kwargs):
- url = self.build_url(*args, **kwargs)
- logger.info("DELETE %s" % url)
- return requests.delete(url)
-
-
-class WebSocketEventModule(object):
- def __init__(self, module_config, test_object):
- logger.info("WebSocketEventModule ctor")
- self.host = '127.0.0.1'
- self.port = 8088
- self.test_object = test_object
- self.ari = ARI(self.host, self.port)
- self.event_matchers = [
- EventMatcher(self.ari, e, test_object)
- for e in module_config['events']]
- apps = module_config['apps']
- if isinstance(apps, list):
- apps = ','.join(apps)
- self.factory = StasisClientFactory(host=self.host, port=8088, apps=apps,
- on_event=self.on_event)
-
- def on_event(self, event):
- for matcher in self.event_matchers:
- matcher.on_event(event)
+class Range(object):
+ '''Utility object to handle numeric ranges (inclusive).
+ '''
+ def __init__(self, min=0, max=float("inf")):
+ '''Constructor.
+
+ :param min: Minimum value of the range.
+ :param max: Maximum value of the range.
+ '''
+ self.min = min
+ self.max = max
+
+ def contains(self, v):
+ '''Checks if the given value is within this Range.
+
+ :param v: Value to check.
+ :returns: True/False if v is/isn't in the Range.
+ '''
+ return self.min <= v <= self.max
+
+
+def decode_range(yaml):
+ '''Parse a range from YAML specification.
+ '''
+ if yaml is None:
+ # Unspecified; receive at least one
+ return Range(1, float("inf"))
+ elif isinstance(yaml, int):
+ # Need exactly this many events
+ return Range(yaml, yaml)
+ elif yaml[0] == '<':
+ # Need at most this many events
+ return Range(0, int(yaml[1:]))
+ elif yaml[0] == '>':
+ # Need at least this many events
+ return Range(int(yaml[1:]), float("inf"))
+ else:
+ # Need exactly this many events
+ return Range(int(yaml), int(yaml))
Modified: asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py?view=diff&rev=3843&r1=3842&r2=3843
==============================================================================
--- asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py (original)
+++ asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py Tue Jun 11 17:00:45 2013
@@ -12,15 +12,17 @@
id = None
+
def on_start(ari, event):
- logger.debug("rest_continue.on_start(%r)" % event)
+ logger.debug("on_start(%r)" % event)
global id
id = event['stasis_start']['channel']['uniqueid']
resp = ari.post('channels', id, 'continue')
resp.raise_for_status()
return True
+
def on_end(ari, event):
- logger.debug("rest_continue.on_end(%r)" % event)
+ logger.debug("on_end(%r)" % event)
global id
return id == event['stasis_end']['channel']['uniqueid']
More information about the svn-commits
mailing list