[svn-commits] dlee: branch dlee/ari-tests r3843 - in /asterisk/team/dlee/ari-tests: lib/pyt...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Tue Jun 11 17:00:46 CDT 2013


Author: dlee
Date: Tue Jun 11 17:00:45 2013
New Revision: 3843

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=3843
Log:
Clean up; PEP-8

Modified:
    asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py
    asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py

Modified: asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py?view=diff&rev=3843&r1=3842&r2=3843
==============================================================================
--- asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py (original)
+++ asterisk/team/dlee/ari-tests/lib/python/asterisk/ari.py Tue Jun 11 17:00:45 2013
@@ -21,32 +21,187 @@
 
 logger = logging.getLogger(__name__)
 
-class Range(object):
-    def __init__(self, min = 0, max = float("inf")):
-        self.min = min
-        self.max = max
-
-    def contains(self, v):
-        return self.min <= v <= self.max
-
-def decode_range(v):
-    if v is None:
-        # Unspecified; recieve at least one
-        return Range(1, float("inf"))
-    elif isinstance(v, int):
-        # Need exactly this many events
-        return Range(v, v)
-    elif v[0] == '<':
-        # Need at most this many events
-        return Range(0, int(v[1:]))
-    elif v[0] == '>':
-        # Need at least this many events
-        return Range(int(v[1:]), float("inf"))
-    else:
-        # Need exactly this many events
-        return Range(int(v), int(v))
+DEFAULT_PORT = 8088
+
+
+class WebSocketEventModule(object):
+    '''Module for capturing events from the ARI WebSocket
+    '''
+
+    def __init__(self, module_config, test_object):
+        '''Constructor.
+
+        :param module_config: Configuration dict parse from test-config.yaml.
+        :param test_object: Test control object.
+        '''
+        logger.info("WebSocketEventModule ctor")
+        self.host = '127.0.0.1'
+        self.port = DEFAULT_PORT
+        self.test_object = test_object
+        #: ARI interface object
+        self.ari = ARI(self.host, self.port)
+        #: Matchers for incoming events
+        self.event_matchers = [
+            EventMatcher(self.ari, e, test_object)
+            for e in module_config['events']]
+        apps = module_config['apps']
+        if isinstance(apps, list):
+            apps = ','.join(apps)
+        #: Twisted protocol factory for ARI WebSockets
+        self.factory = AriClientFactory(host=self.host, port=self.port,
+                                        apps=apps, on_event=self.on_event)
+
+    def on_event(self, event):
+        '''Handle incoming events from the WebSocket.
+
+        :param event: Dictionary parsed from incoming JSON event.
+        '''
+        for matcher in self.event_matchers:
+            matcher.on_event(event)
+
+
+class AriClientFactory(WebSocketClientFactory):
+    '''Twisted protocol factory for building ARI WebSocket clients.
+    '''
+    def __init__(self, host, apps, on_event, port=DEFAULT_PORT,
+                 timeout_secs=15):
+        '''Constructor
+
+        :param host: Hostname of Asterisk.
+        :param apps: App names to subscribe to.
+        :param on_event: Callback to invoke for all received events.
+        :param port: Port of Asterisk web server.
+        :param timeout_secs: Maximum time to try to connect to Asterisk.
+        '''
+        url = "ws://%s:%d/ws?%s" % \
+              (host, port, urllib.urlencode({'app': apps}))
+        WebSocketClientFactory.__init__(self, url, protocols=["stasis"])
+        self.on_event = on_event
+        self.timeout_secs = timeout_secs
+        self.protocol = self.__build_protocol
+        self.attempts = 0
+        self.start = None
+
+        self.reconnect()
+
+    def __build_protocol(self):
+        '''Build a client protocol instance
+        '''
+        return AriClientProtocol(self.on_event)
+
+    def clientConnectionFailed(self, connector, reason):
+        '''Callback when client connection failed to connect.
+
+        :param connector: Twisted connector.
+        :param reason: Failure reason.
+        '''
+        logger.info("clientConnectionFailed(%s)" % (reason))
+        reactor.callLater(1, self.reconnect)
+
+    def reconnect(self):
+        '''Attempt to reconnect the ARI WebSocket.
+
+        This call will give up after timeout_secs has been exceeded.
+        '''
+        self.attempts += 1
+        logger.debug("WebSocket attempt #%d" % self.attempts)
+        if not self.start:
+            self.start = datetime.datetime.now()
+        runtime = (datetime.datetime.now() - self.start).seconds
+        if runtime >= self.timeout_secs:
+            logger.error("  Giving up after %d seconds" % self.timeout_secs)
+            return
+
+        connectWS(self)
+
+
+class AriClientProtocol(WebSocketClientProtocol):
+    '''Twisted protocol for handling a ARI WebSocket connection.
+    '''
+    def __init__(self, on_event):
+        '''Constructor.
+
+        :param on_event: Callback to invoke with each parsed event.
+        '''
+        self.on_event = on_event
+
+    def onOpen(self):
+        '''Called back when connection is open.
+        '''
+        logger.debug("onOpen()")
+
+    def onClose(self, wasClean, code, reason):
+        '''Called back when connection is closed.
+        '''
+        logger.debug("onClose(%r, %d, %s)" % (wasClean, code, reason))
+        reactor.callLater(1, self.factory.reconnect)
+
+    def onMessage(self, msg, binary):
+        '''Called back when message is received.
+
+        :param msg: Received text message.
+        '''
+        self.on_event(json.loads(msg))
+
+
+class ARI(object):
+    '''Bare bones object for an ARI interface.
+    '''
+
+    def __init__(self, host, port=DEFAULT_PORT):
+        '''Constructor.
+
+        :param host: Hostname of Asterisk.
+        :param port: Port of the Asterisk webserver.
+        '''
+        self.base_url = "http://%s:%d/stasis" % (host, port)
+
+    def build_url(self, *args):
+        '''Build a URL from the given path.
+
+        For example::
+            # Builds the URL for /channels/{channel_id}/answer
+            ari.build_url('channels', channel_id, 'answer')
+
+        :param args: Path segments.
+        '''
+        path = [str(arg) for arg in args]
+        return '/'.join([self.base_url] + path)
+
+    def get(self, *args, **kwargs):
+        '''Send a GET request to ARI.
+
+        :param args: Path segements.
+        :param kwargs: Query parameters.
+        '''
+        url = self.build_url(*args, **kwargs)
+        logger.info("GET %s %r" % (url, kwargs))
+        return requests.get(url, params=kwargs)
+
+    def post(self, *args, **kwargs):
+        '''Send a POST request to ARI.
+
+        :param args: Path segements.
+        :param kwargs: Query parameters.
+        '''
+        url = self.build_url(*args, **kwargs)
+        logger.info("POST %s %r" % (url, kwargs))
+        return requests.post(url, params=kwargs)
+
+    def delete(self, *args, **kwargs):
+        '''Send a DELETE request to ARI.
+
+        :param args: Path segements.
+        :param kwargs: Query parameters.
+        '''
+        url = self.build_url(*args, **kwargs)
+        logger.info("DELETE %s %r" % (url, kwargs))
+        return requests.delete(url, params=kwargs)
+
 
 class EventMatcher(object):
+    '''Object to observe incoming events and match them agains a configuration.
+    '''
     def __init__(self, ari, instance_config, test_object):
         self.ari = ari
         self.instance_config = instance_config
@@ -66,6 +221,10 @@
         test_object.register_stop_observer(self.on_stop)
 
     def on_event(self, message):
+        '''Callback for every received ARI event.
+
+        :param message: Parsed event from ARI WebSocket.
+        '''
         if self.matches(message):
             self.count += 1
             # Split call and accumulation to always call the callback
@@ -76,10 +235,15 @@
                                  self.instance_config)
                 self.passed = self.passed and res
             except:
-                logger.error("Exception in callback: %s" % traceback.format_exc())
+                logger.error("Exception in callback: %s" %
+                             traceback.format_exc())
                 self.passed = False
 
     def on_stop(self, *args):
+        '''Callback for the end of the test.
+
+        :param args: Ignored arguments.
+        '''
         if not self.count_range.contains(self.count):
             logger.error("Expected %d <= count <= %d; was %d (%r)",
                          self.count_range.min, self.count_range.max,
@@ -88,7 +252,11 @@
         self.test_object.set_passed(self.passed)
 
     def matches(self, message):
-        # Validate the match
+        '''Compares a message against the configured conditions.
+
+        :param message: Incoming ARI WebSocket event.
+        :returns: True if message matches conditions; False otherwise.
+        '''
         match = self.conditions.get('match')
         res = all_match(match, message)
 
@@ -98,118 +266,79 @@
             res = not all_match(nomatch, message)
         return res
 
+
 def all_match(pattern, message):
+    '''Match a pattern from the YAML config with a received message.
+
+    :param pattern: Configured pattern.
+    :param message: Message to compare.
+    :returns: True if message matches pattern; False otherwise.
+    '''
     #logger.debug("%r ?= %r" % (pattern, message))
     if pattern is None:
+        # Empty pattern always matches
         return True
     elif isinstance(pattern, list):
+        # List must be an exact match
         res = len(pattern) == len(message)
         i = 0
         while res and i < len(pattern):
             res = all_match(pattern[i], message[i])
         return res
     elif isinstance(pattern, dict):
+        # Dict should match for every field in the pattern.
+        # extra fields in the message are fine.
         for key, value in pattern.iteritems():
             to_check = message.get(key)
             if to_check is None or not all_match(value, to_check):
                 return False
         return True
     elif isinstance(pattern, str):
+        # Pattern strings are considered to be regexes
         return re.match(pattern, message) is not None
     elif isinstance(pattern, int):
+        # Integers are literal matches
         return pattern == message
     else:
         logger.error("Unhandled pattern type %s" % type(pattern)).__name__
 
 
-class StasisClientProtocol(WebSocketClientProtocol):
-    def __init__(self, on_event):
-        self.on_event = on_event
-
-    def onOpen(self):
-        logger.debug("onOpen()")
-
-    def onClose(self, wasClean, code, reason):
-        logger.debug("onClose(%r, %d, %s)" % (wasClean, code, reason))
-        reactor.callLater(1, self.factory.reconnect)
-
-    def onMessage(self, msg, binary):
-        self.on_event(json.loads(msg))
-
-
-class StasisClientFactory(WebSocketClientFactory):
-    def __init__(self, host, port, apps, on_event, timeout_secs=15):
-        url = "ws://%s:%d/ws?%s" % \
-              (host, port, urllib.urlencode({'app': apps}))
-        WebSocketClientFactory.__init__(self, url, protocols=["stasis"])
-        self.on_event = on_event
-        self.timeout_secs = timeout_secs
-        self.protocol = self.__build_protocol
-        self.attempts = 0
-        self.start = None
-
-        self.reconnect()
-
-    def __build_protocol(self):
-        return StasisClientProtocol(self.on_event)
-
-    def clientConnectionFailed(self, connector, reason):
-        logger.info("clientConnectionFailed(%s)" % (reason))
-        reactor.callLater(1, self.reconnect)
-
-    def reconnect(self):
-        self.attempts += 1
-        logger.debug("WebSocket attempt #%d" % self.attempts)
-        if not self.start:
-            self.start = datetime.datetime.now()
-        runtime = (datetime.datetime.now() - self.start).seconds
-        if runtime >= self.timeout_secs:
-            logger.error("  Giving up after %d seconds" % self.timeout_secs)
-        else:
-            connectWS(self)
-
-
-class ARI(object):
-    def __init__(self, host, port):
-        self.base_url = "http://%s:%d/stasis" % (host, port)
-
-    def build_url(self, *args, **kwargs):
-        url = '/'.join([self.base_url] + list(args))
-        params = urllib.urlencode(kwargs)
-        return "%s?%s" % (url, params)
-
-    def get(self, *args, **kwargs):
-        url = self.build_url(*args, **kwargs)
-        logger.info("GET %s" % url)
-        return requests.get()
-
-    def post(self, *args, **kwargs):
-        url = self.build_url(*args, **kwargs)
-        logger.info("POST %s" % url)
-        return requests.post(url)
-
-    def delete(self, *args, **kwargs):
-        url = self.build_url(*args, **kwargs)
-        logger.info("DELETE %s" % url)
-        return requests.delete(url)
-
-
-class WebSocketEventModule(object):
-    def __init__(self, module_config, test_object):
-        logger.info("WebSocketEventModule ctor")
-        self.host = '127.0.0.1'
-        self.port = 8088
-        self.test_object = test_object
-        self.ari = ARI(self.host, self.port)
-        self.event_matchers = [
-            EventMatcher(self.ari, e, test_object)
-            for e in module_config['events']]
-        apps = module_config['apps']
-        if isinstance(apps, list):
-            apps = ','.join(apps)
-        self.factory = StasisClientFactory(host=self.host, port=8088, apps=apps,
-                                           on_event=self.on_event)
-
-    def on_event(self, event):
-        for matcher in self.event_matchers:
-            matcher.on_event(event)
+class Range(object):
+    '''Utility object to handle numeric ranges (inclusive).
+    '''
+    def __init__(self, min=0, max=float("inf")):
+        '''Constructor.
+
+        :param min: Minimum value of the range.
+        :param max: Maximum value of the range.
+        '''
+        self.min = min
+        self.max = max
+
+    def contains(self, v):
+        '''Checks if the given value is within this Range.
+
+        :param v: Value to check.
+        :returns: True/False if v is/isn't in the Range.
+        '''
+        return self.min <= v <= self.max
+
+
+def decode_range(yaml):
+    '''Parse a range from YAML specification.
+    '''
+    if yaml is None:
+        # Unspecified; receive at least one
+        return Range(1, float("inf"))
+    elif isinstance(yaml, int):
+        # Need exactly this many events
+        return Range(yaml, yaml)
+    elif yaml[0] == '<':
+        # Need at most this many events
+        return Range(0, int(yaml[1:]))
+    elif yaml[0] == '>':
+        # Need at least this many events
+        return Range(int(yaml[1:]), float("inf"))
+    else:
+        # Need exactly this many events
+        return Range(int(yaml), int(yaml))

Modified: asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py?view=diff&rev=3843&r1=3842&r2=3843
==============================================================================
--- asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py (original)
+++ asterisk/team/dlee/ari-tests/tests/rest_api/continue/rest_continue.py Tue Jun 11 17:00:45 2013
@@ -12,15 +12,17 @@
 
 id = None
 
+
 def on_start(ari, event):
-    logger.debug("rest_continue.on_start(%r)" % event)
+    logger.debug("on_start(%r)" % event)
     global id
     id = event['stasis_start']['channel']['uniqueid']
     resp = ari.post('channels', id, 'continue')
     resp.raise_for_status()
     return True
 
+
 def on_end(ari, event):
-    logger.debug("rest_continue.on_end(%r)" % event)
+    logger.debug("on_end(%r)" % event)
     global id
     return id == event['stasis_end']['channel']['uniqueid']




More information about the svn-commits mailing list