[asterisk-commits] Change in testsuite[master]: stasis: set a channel variable on websocket disconnect error
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Apr 8 15:57:02 CDT 2015
Mark Michelson has submitted this change and it was merged.
Change subject: stasis: set a channel variable on websocket disconnect error
......................................................................
stasis: set a channel variable on websocket disconnect error
This test is to ensure Asterisk applies the correct state to the channel
variable, STASISSTATUS. STASISSTATUS was introduced as a means for Stasis
applications to have context when errors occur in Stasis that disrupt normal
processing.
The test scenarios:
1. The 'Babs' scenario:
a. A channel is originated through ARI referencing a subscribed app (Babs)
that was registered in Stasis during startup.
b. After Stasis is started, the channel is then hungup.
c. A check is made to ensure that the value of STASISSTATUS is SUCCESS.
2. The 'Bugs' scenario:
a. A channel is originated through ARI referencing a subscribed app
(BugsAlt) that was never registered in Stasis.
b. A check is then made to ensure that the value of STASISSTATUS is FAILED.
3. The 'Buster' scenario:
a. A channel is originated through ARI referencing a subscribed app
(Buster) that was registered in Stasis during startup.
b. While the channel from step 'a' is still active, the websocket is then
disconnected out from underneath ARI.
c. A new channel is originated through ARI, also referencing the subscribed
app (Buster) that was registered in Stasis during startup.
d. A check is then made to ensure that the value of STASISSTATUS is FAILED.
ASTERISK-24802
Reported By: Kevin Harwell
Change-Id: I0f7dadfd429bd30e9f07a531f47884d8c923fc13
---
A tests/rest_api/applications/stasisstatus/__init__.py
A tests/rest_api/applications/stasisstatus/ari_client.py
A tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf
A tests/rest_api/applications/stasisstatus/monitor.py
A tests/rest_api/applications/stasisstatus/observable_object.py
A tests/rest_api/applications/stasisstatus/run-test
A tests/rest_api/applications/stasisstatus/test-config.yaml
A tests/rest_api/applications/stasisstatus/test_case.py
A tests/rest_api/applications/stasisstatus/test_scenario.py
M tests/rest_api/applications/tests.yaml
10 files changed, 1,447 insertions(+), 0 deletions(-)
Approvals:
Mark Michelson: Looks good to me, approved; Verified
Matt Jordan: Looks good to me, but someone else must approve
diff --git a/tests/rest_api/applications/stasisstatus/__init__.py b/tests/rest_api/applications/stasisstatus/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/__init__.py
diff --git a/tests/rest_api/applications/stasisstatus/ari_client.py b/tests/rest_api/applications/stasisstatus/ari_client.py
new file mode 100644
index 0000000..e7e2eb3
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/ari_client.py
@@ -0,0 +1,404 @@
+#!/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+import logging
+import uuid
+
+sys.path.append("lib/python")
+sys.path.append("tests/rest_api/applications")
+
+from asterisk.ari import ARI, AriClientFactory
+from stasisstatus.observable_object import ObservableObject
+from twisted.internet import defer, reactor
+
+LOGGER = logging.getLogger(__name__)
+
+
+class AriClient(ObservableObject):
+ """The ARI client.
+
+ This class serves as a facade for ARI and AriClientFactory. It is
+ responsible for creating and persisting the connection state needed to
+ execute a test scenario.
+ """
+
+ def __init__(self, host, port, credentials, name='testsuite'):
+ """Constructor.
+
+ Keyword Arguments:
+ host -- The [bindaddr] of the Asterisk HTTP web
+ server.
+ port -- The [bindport] of the Asterisk HTTP web
+ server.
+ credentials -- User credentials for ARI. A tuple.
+ E.g.: ('username', 'password').
+ name -- The name of the app to register in Stasis via
+ ARI (optional) (default 'testsuite').
+ """
+
+
+ super(AriClient, self).__init__(name, ['on_channelcreated',
+ 'on_channeldestroyed',
+ 'on_channelvarset',
+ 'on_client_start',
+ 'on_client_stop',
+ 'on_stasisend',
+ 'on_stasisstart',
+ 'on_ws_open',
+ 'on_ws_closed'])
+ self.__ari = None
+ self.__factory = None
+ self.__ws_client = None
+ self.__channels = []
+ self.__host = host
+ self.__port = port
+ self.__credentials = credentials
+
+ def connect_websocket(self):
+ """Creates an AriClientFactory instance and connects to it."""
+
+ def wait_for_it(deferred=None):
+ """Waits for the client to reset before connecting the web socket.
+
+ Keyword Arguments:
+ deferred -- The twisted.defer instance to use for
+ chaining callbacks (optional)
+ (default None).
+ """
+
+ msg = '{0} '.format(self)
+
+ if not deferred:
+ deferred = defer.Deferred()
+ if not self.clean:
+ LOGGER.debug(msg + 'I\'m not so fresh so clean.')
+ reactor.callLater(1, wait_for_it, deferred)
+ else:
+ LOGGER.debug(msg + 'Connecting web socket.')
+ self.__ari = ARI(self.__host, userpass=self.__credentials)
+ self.__factory = AriClientFactory(receiver=self,
+ host=self.__host,
+ port=self.__port,
+ apps=self.name,
+ userpass=self.__credentials)
+ deferred.callback(self.__factory.connect())
+
+ self.__reset()
+ wait_for_it()
+ return
+
+ def __delete_all_channels(self):
+ """Deletes all the channels."""
+
+ if len(self.__channels) == 0:
+ return
+
+ if self.__ari is not None:
+ allow_errors = self.__ari.allow_errors
+ self.__ari.set_allow_errors(True)
+ channels = list().extend(self.__channels)
+ for channel in channels:
+ self.hangup_channel(channel)
+ self.__ari.set_allow_errors(allow_errors)
+ else:
+ del self.__channels[:]
+ return
+
+ def disconnect_websocket(self):
+ """Disconnects the web socket."""
+
+ msg = '{0} '.format(self)
+
+ if self.__ws_client is None:
+ info = 'Cannot disconnect; no web socket is connected.'
+ LOGGER.debug(msg + info)
+ return self
+
+ if self.__ari is not None:
+ warning = 'Disconnecting web socket with an active ARI connection.'
+ LOGGER.warn(msg + warning)
+
+ LOGGER.debug(msg + 'Disconnecting the web socket.')
+ self.__ws_client.transport.loseConnection()
+ return self
+
+ def hangup_channel(self, channel_id):
+ """Deletes a channel.
+
+ Keyword Arguments:
+ channel_id -- The id of the channel to delete.
+
+ Returns:
+ The JSON response object from the DELETE to ARI.
+
+ Raises:
+ ValueError
+ """
+
+ msg = '{0} '.format(self)
+
+ if self.__ari is None:
+ msg += 'Cannot hangup channel; ARI instance has no value.'
+ raise ValueError(msg.format(self))
+
+ LOGGER.debug(msg + 'Deleting channel [{0}].'.format(channel_id))
+
+ try:
+ self.__channels.remove(channel_id)
+ except ValueError:
+ pass
+
+ return self.__ari.delete('channels', channel_id)
+
+ def on_channelcreated(self, message):
+ """Callback for the ARI 'ChannelCreated' event.
+
+ Keyword Arguments:
+ message -- the JSON message
+ """
+
+ channel = message['channel']['id']
+ if channel not in self.__channels:
+ self.__channels.append(channel)
+
+ self.notify_observers('on_channelcreated', message)
+
+ def on_channeldestroyed(self, message):
+ """Callback for the ARI 'ChannelDestroyed' event.
+
+ Keyword Arguments:
+ message -- the JSON message
+ """
+
+ channel = message['channel']['id']
+ try:
+ self.__channels.remove(channel)
+ except ValueError:
+ pass
+
+ self.notify_observers('on_channeldestroyed', message)
+
+ def on_channelvarset(self, message):
+ """Callback for the ARI 'ChannelVarset' event.
+
+ Keyword Arguments:
+ message -- the JSON message
+ """
+
+ self.notify_observers('on_channelvarset', message)
+
+ def on_client_start(self):
+ """Notifies the observers of the 'on_client_start' event."""
+
+ LOGGER.debug('{0} Client is started.'.format(self))
+ self.notify_observers('on_client_start', None, True)
+
+ def on_client_stop(self):
+ """Notifies the observers of the 'on_client_stop' event."""
+
+ LOGGER.debug('{0} Client is stopped.'.format(self))
+ self.notify_observers('on_client_stop', None, True)
+
+ def on_stasisend(self, message):
+ """Callback for the ARI 'StasisEnd' event
+
+ Keyword Arguments:
+ message -- the JSON message
+ """
+
+ self.notify_observers('on_stasisend', message)
+
+ def on_stasisstart(self, message):
+ """Callback for the ARI 'StasisEnd' event
+
+ Keyword Arguments:
+ message -- the JSON message
+ """
+
+ self.notify_observers('on_stasisstart', message)
+
+ def on_ws_closed(self, ws_client):
+ """Callback for AriClientProtocol 'onClose' handler.
+
+ Keyword Arguments:
+ ws_client -- The AriClientProtocol object that raised
+ the event.
+ """
+
+ LOGGER.debug('{0} WebSocket connection closed.'.format(self))
+ self.__ws_client = None
+ self.notify_observers('on_ws_closed', None)
+
+ def on_ws_event(self, message):
+ """Callback for AriClientProtocol 'onMessage' handler.
+
+ Keyword Arguments:
+ message -- The event payload.
+ """
+
+ LOGGER.debug("{0} In on_ws_event; message={1}".format(self, message))
+
+ event = 'on_{0}'.format(message.get('type').lower())
+
+ if event == 'on_ws_open' or event == 'on_ws_closed':
+ return
+
+ callback = getattr(self, event, None)
+ if callback and callable(callback):
+ callback(message)
+ self.notify_observers(event, message)
+
+ def on_ws_open(self, ws_client):
+ """Callback for AriClientProtocol 'onOpen' handler.
+
+ Keyword Arguments:
+ ws_client -- The AriClientProtocol object that raised
+ the event.
+ """
+
+ LOGGER.debug('{0} WebSocket connection opened.'.format(self))
+ self.__ws_client = ws_client
+ self.notify_observers('on_ws_open', None)
+ self.on_client_start()
+
+ def originate(self, endpoint, app=None):
+ """Originates a channel.
+
+ Keyword Arguments:
+ endpoint -- The endpoint to use for the ARI request.
+ app -- The name of the Stasis app (optional)
+ (default None).
+
+ Returns:
+ The JSON response object from the POST to ARI.
+
+ Raises:
+ ValueError
+ """
+
+ msg = '{0} '.format(self)
+
+ if self.__ari is None:
+ msg += 'Cannot originate channel; ARI instance has no value.'
+ raise ValueError(msg)
+
+ channel = dict()
+ if app is not None:
+ channel['app'] = app
+ channel['channelId'] = str(uuid.uuid4())
+ channel['endpoint'] = endpoint
+
+ msg += 'Originating channel [{0}].'
+ LOGGER.debug(msg.format(channel['channelId']))
+ return self.__ari.post('channels', **channel)
+
+ def __reset(self):
+ """Resets the AriClient to its initial state.
+
+ Returns:
+ A twisted.defer instance.
+ """
+
+ if not self.clean:
+ LOGGER.debug('{0} About to reset my state!'.format(self))
+ self.__tear_down()
+ return
+
+ def start(self):
+ """Starts the client."""
+
+ LOGGER.debug('{0} Starting client connections.'.format(self))
+ self.connect_websocket()
+
+ def stop(self):
+ """Stops the client."""
+
+ LOGGER.debug('{0} Stopping client connections.'.format(self))
+ self.suspend()
+ self.__reset()
+
+ def __tear_down(self):
+ """Tears down the channels and web socket."""
+
+ def wait_for_it(deferred=None, run=0):
+ """Disposes each piece, one at a time.
+
+
+ The first run (run=0) initialized the deferred and kicks of
+ the process to destroy all of our channels.
+
+ The second run (run=1) waits for all the channels to be
+ destroyed then kicks off the process to disconnect the web socket.
+
+ The third run (run=2) waits for the web socket to
+ disconnect then cleans up the remaining state variables.
+
+ Keyword Arguments:
+ deferred -- The twisted.defer instance to use for
+ chaining callbacks (optional)
+ (default None).
+ run -- The current phase of tear down:
+ 0=Entry phase
+ 1=Waiting for ARI to destroy all channels
+ 2=Calls ARI to Disconnects the web socket
+ 3=Waiting for ARI to disconnect the web
+ socket
+ """
+
+ msg = '{0} '.format(self)
+
+ if not deferred:
+ deferred = defer.Deferred()
+ self.suspend()
+ if run == 0:
+ LOGGER.debug(msg + 'Tearing down active connections.')
+ self.__delete_all_channels()
+ reactor.callLater(2, wait_for_it, deferred, 1)
+ elif run == 1:
+ if len(self.__channels) > 0:
+ msg += 'Waiting for channels to be destroyed.'
+ LOGGER.debug(msg)
+ reactor.callLater(2, wait_for_it, deferred, 1)
+ reactor.callLater(2, wait_for_it, deferred, 2)
+ elif run == 2:
+ LOGGER.debug(msg + 'Disconnecting web socket.')
+ self.__ari = None
+ self.__factory = None
+ self.disconnect_websocket()
+ reactor.callLater(2, wait_for_it, deferred, 3)
+ elif run == 3:
+ if self.__ws_client is not None:
+ msg += 'Waiting for web socket to be destroyed.'
+ LOGGER.debug(msg)
+ reactor.callLater(2, wait_for_it, deferred, 3)
+ else:
+ LOGGER.debug(msg + 'Client successfully torn down.')
+ reactor.callLater(0, self.on_client_stop)
+ reactor.callLater(2, self.reset_registrar)
+ deferred.callback(self.resume())
+ wait_for_it()
+ return
+
+ @property
+ def clean(self):
+ """Returns True if the client has no orphaned connections
+ needing to be torn down. False otherwise."""
+
+ if len(self.__channels) == 0:
+ LOGGER.debug('{0} No channels!'.format(self))
+ if self.__ws_client is None:
+ LOGGER.debug('{0} No ws_client!'.format(self))
+ if self.__ari is None:
+ LOGGER.debug('{0} No ari!'.format(self))
+ if self.__factory is None:
+ LOGGER.debug('{0} No factory!'.format(self))
+ LOGGER.debug('{0} I\'m clean!'.format(self))
+ return True
+ return False
diff --git a/tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf b/tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf
new file mode 100644
index 0000000..0bc27ab
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf
@@ -0,0 +1,28 @@
+[globals]
+
+[Acme]
+
+exten => _[Bb]abs,1,NoOp()
+ same => n,GoSub(subComicDispenser,1(${EXTEN}))
+ same => n,Hangup()
+
+exten => _[Bb]ugs,1,NoOp()
+ same => n,GoSub(subComicDispenser,1(${EXTEN}))
+ same => n,Hangup()
+
+exten => _[Bb]ugs[Aa]lt,1,NoOp()
+ same => n,GoSub(subComicDispenser,1(${EXTEN}))
+ same => n,Hangup()
+
+exten => _[Bb]uster,1,NoOp()
+ same => n,GoSub(subComicDispenser,1(${EXTEN}))
+ same => n,Hangup()
+
+exten => _[Bb]uster[Aa]lt,1,NoOp()
+ same => n,GoSub(subComicDispenser,1(Buster))
+ same => n,Hangup()
+
+exten => subComicDispenser,1,NoOp()
+ same => n,Answer()
+ same => n,Stasis(${ARG1})
+ same => n,Return()
\ No newline at end of file
diff --git a/tests/rest_api/applications/stasisstatus/monitor.py b/tests/rest_api/applications/stasisstatus/monitor.py
new file mode 100644
index 0000000..4504ac3
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/monitor.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+import logging
+
+sys.path.append("lib/python")
+sys.path.append("tests/rest_api/applications")
+
+from stasisstatus.observable_object import ObservableObject
+
+LOGGER = logging.getLogger(__name__)
+
+
+class ChannelVariableMonitor(ObservableObject):
+ """Monitors the system for state changes for a given channel variable."""
+
+ def __init__(self, ami, variable, name):
+ """Constructor.
+
+ Keyword Arguments:
+ ami -- The AMI instance to monitor.
+ variable -- The name of the channel variable to monitor
+ (optional) (default None).
+ name -- The name of this ChannelVariableMonitor
+ instance.
+ """
+
+ super(ChannelVariableMonitor, self).__init__(name,
+ ['on_value_changed'])
+ self.__ami = ami
+ self.__captured_value = None
+ self.__channel_variable = variable
+ self.__monitored_channel = None
+
+ self.__ami.registerEvent('VarSet', self.__on_ami_varset)
+ self.__ami.registerEvent('UserEvent', self.__on_ami_user_event)
+
+ def __log_event(self, handler, event_data):
+ """Logs event messages.
+
+ Keyword Arguments:
+ handler -- The name of the event handler.
+ event_data -- The event payload or message.
+ """
+
+ LOGGER.debug('{0} In {1}; event data={2}'.format(self,
+ handler,
+ event_data))
+
+ def __on_ami_user_event(self, ami, message):
+ """Handles the AMI 'UserEvent' event.
+
+ Keyword Arguments:
+ ami -- The AMI instance.
+ message -- The event payload.
+ """
+
+ if message['uniqueid'] != self.__monitored_channel:
+ return
+
+ if message['userevent'] != 'StasisStatus':
+ return
+
+ self.captured_value = message['value']
+
+ def __on_ami_varset(self, ami, message):
+ """Handles the AMI 'VarSet' event.
+
+ Keyword Arguments:
+ ami -- The AMI instance.
+ message -- The event payload.
+ """
+
+ self.__log_event('__on_ami_varset', message)
+
+ msg = '{0} '.format(self)
+
+ if self.suspended:
+ LOGGER.debug(msg + 'Monitoring is suspended.')
+ return
+
+ if message['uniqueid'] != self.__monitored_channel:
+ return
+ if message['variable'] != self.__channel_variable:
+ return
+
+ self.captured_value = message['value']
+
+ def start(self, channel):
+ """Tells the monitor to start monitoring for the given channel.
+
+ Keyword Arguments:
+ channel -- The id of the channel to use for monitoring.
+ """
+
+ LOGGER.debug('{0} Monitoring starting for channel[{1}]'.format(self,
+ channel))
+ self.__monitored_channel = channel
+ self.activate()
+
+ @property
+ def captured_value(self):
+ """The current value captured for the monitored channel variable."""
+
+ return self.__captured_value
+
+ @captured_value.setter
+ def captured_value(self, value):
+ """Sets the captured value."""
+
+ self.__captured_value = value
+ LOGGER.debug('{0} {1}={2}.'.format(self,
+ self.__channel_variable,
+ self.__captured_value))
+ self.notify_observers('on_value_changed', None, False)
+
diff --git a/tests/rest_api/applications/stasisstatus/observable_object.py b/tests/rest_api/applications/stasisstatus/observable_object.py
new file mode 100644
index 0000000..20f62f0
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/observable_object.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+import logging
+
+sys.path.append("lib/python")
+sys.path.append("tests/rest_api/applications")
+
+LOGGER = logging.getLogger(__name__)
+
+class ObservableObject(object):
+ """Definition for an observable object."""
+
+ def __init__(self, name, events):
+ """Constructor.
+
+ Keyword Arguments:
+ name -- The name of this ObservableObject.
+ events -- The events available for observing.
+ """
+
+ self.__name = name
+ self.__registrar = dict()
+ self.__suspended = 0
+
+ for event in events:
+ self.__registrar[event] = list()
+
+ def __format__(self, format_spec):
+ """Overrides default format handling for 'self'."""
+
+ return self.__class__.__name__ + '[' + self.name + ']:'
+
+ def activate(self):
+ """Activates the object notification system."""
+
+ self.__suspended = 0
+
+ def notify_observers(self, event, message, notify_on_suspend=False):
+ """Starts the chain of invocations for the callbacks.
+
+ Keyword Arguments:
+ event -- The name of the event being raised.
+ message -- The event payload.
+ notify_on_suspend -- Whether or not to override suspended
+ notification logic (optional) (default False).
+
+ Raises:
+ ValueError
+ """
+
+ msg = '{0} '.format(self)
+
+ if self.suspended and not notify_on_suspend:
+ LOGGER.debug(msg + " Suspended; cannot notify observers.")
+ return
+
+ if not self.__validate(event):
+ error = msg + 'Could not notify observers; Validation failed.'
+ raise ValueError(error)
+
+ for callback in self.__registrar[event]:
+ LOGGER.debug(msg + 'Invoking {0}'.format(callback))
+ callback(self, message)
+ return
+
+ def reset_registrar(self):
+ """Resets the registrar to its initial, empty state.
+
+ Note: This will reset the entire observer registrar.
+ """
+
+ msg = '{0} '.format(self)
+
+ LOGGER.debug(msg + 'Resetting the observer registrar')
+ for event in self.__registrar:
+ del self.__registrar[event][:]
+ LOGGER.debug(msg + 'Reset the observer registrar.')
+ return
+
+ def register_observers(self, event, observers):
+ """Registers an observer with the list of observers.
+
+ Keyword Arguments:
+ event -- The event to observe.
+ observers -- A list of callable observers or a single
+ callable observer.
+
+ Raises:
+ TypeError
+ ValueError
+ """
+
+ msg = '{0} '.format(self)
+ error = msg + 'Could not register observers'
+
+ if observers is None:
+ error += ' for event [{0}]; [Observers] is None.'.format(event)
+ return
+
+ if not self.__validate(event):
+ error += '; Validation failed.'
+ raise ValueError(error)
+
+ cache = list()
+ if callable(observers):
+ cache.append(observers)
+ elif isinstance(observers, list):
+ cache.extend(observers)
+ else:
+ msg += 'Cannot register observer {0} with registrar; [{1}] \
+ is an unsupported type.'
+ raise TypeError(msg.format(observers,
+ observers.__class__.__name__))
+
+ if self.__registrar[event] is None:
+ msg += 'Instantiating the observers for event {0}.'.format(event)
+ LOGGER.debug(msg)
+ self.__registrar[event] = list()
+ self.__registrar[event].extend(cache)
+ return
+
+ def resume(self):
+ """Resumes monitoring."""
+
+ self.__suspended = max(0, self.__suspended - 1)
+
+ def suspend(self):
+ """Suspends monitoring."""
+
+ self.__suspended += 1
+
+ def __validate(self, event):
+ """Validates the parameters for value.
+
+ Validates that a given event is registered.
+
+ event -- The event to validate.
+
+ Returns:
+ True if the event is registered, False otherwise.
+ """
+
+ valid = None
+ error = '{0} Cannot continue; '.format(self)
+
+ if not event:
+ valid = False
+ reason = 'No value provided for [%r]' % event
+ LOGGER.warn(error + reason)
+ elif event not in self.__registrar:
+ valid = False
+ reason = 'Registrar does not contain an entry for the \
+ event [{1}]'.format(event)
+ LOGGER.warn(error + reason)
+
+ return valid if valid is not None else True
+
+ @property
+ def name(self):
+ """The friendly name for this instance."""
+
+ return self.__name
+
+ @property
+ def suspended(self):
+ """Flag indicating that the scenario is being torn down."""
+
+ return self.__suspended > 0
diff --git a/tests/rest_api/applications/stasisstatus/run-test b/tests/rest_api/applications/stasisstatus/run-test
new file mode 100644
index 0000000..2d981a2
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/run-test
@@ -0,0 +1,246 @@
+#!/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+import logging
+
+sys.path.append("lib/python")
+sys.path.append("tests/rest_api/applications")
+
+from stasisstatus.ari_client import AriClient
+from stasisstatus.monitor import ChannelVariableMonitor
+from stasisstatus.test_case import StasisStatusTestCase
+from stasisstatus.test_scenario import TestScenario
+
+LOGGER = logging.getLogger(__name__)
+
+
+def build_scenarios(ami, host, port, credentials):
+ """Builds the scenarios.
+
+ Keyword Arguments:
+ ami -- The AMI instance for this factory.
+ host -- The [bindaddr] of the Asterisk HTTP web
+ server.
+ port -- The [bindport] of the Asterisk HTTP web
+ server.
+ credentials -- User credentials for ARI.
+ A tuple. E.g. ('username', 'password').
+
+ Returns:
+ A list of TestScenario objects.
+ """
+
+ scenarios = list()
+
+ for name in ['Babs', 'Bugs', 'Buster']:
+ client = AriClient(host,
+ port,
+ credentials,
+ name)
+ monitor = ChannelVariableMonitor(ami,
+ 'STASISSTATUS',
+ name)
+ scenario = globals()[name + "TestScenario"](ami, client, monitor, name)
+ scenarios.append(scenario)
+ return scenarios
+
+
+class BabsTestScenario(TestScenario):
+ """The 'Babs' TestScenario.
+
+ This scenario tests for the case where a call is originated under
+ normal operating conditions and then hungup to determine if Stasis
+ correctly assigns STASISSTATUS=SUCCESS.
+ """
+
+ def __init__(self, ami, ari_client, monitor, name):
+ """Constructor.
+
+ Keyword Arguments:
+ ami -- The AMI instance for this TestScenario.
+ ari_client -- The AriClient to use to for executing the
+ TestStrategy commands.
+ monitor -- The ChannelVariableMonitor instance for this
+ TestScenario.
+ name -- The name for this TestScenario instance.
+ """
+
+ super(BabsTestScenario, self).__init__(ami,
+ ari_client,
+ monitor,
+ 'SUCCESS',
+ name)
+
+ self.__channel = None
+ self.__stasis_started = False
+ self.ari_client.register_observers('on_stasisstart',
+ self.__on_stasisstart)
+
+ def __on_stasisstart(self, sender, message):
+ """Handles the AriClient 'on_stasisstart' event.
+
+ Keyword Arguments:
+ sender -- The object that raised the event.
+ message -- The event payload.
+ """
+
+ if self.__stasis_started:
+ return
+
+ channel = message['channel']['id']
+ if channel == self.__channel:
+ self.__stasis_started = True
+ self.monitor.start(self.__channel)
+ self.ari_client.hangup_channel(self.__channel)
+
+ def run_strategy(self):
+ """Implements the run_strategy from the base class."""
+
+ msg = '{0} '.format(self)
+ LOGGER.debug(msg + 'About to originate a channel with an app that has \
+ been registered in Stasis.')
+ app = self.name
+ endpoint = 'LOCAL/{0}@Acme'.format(app)
+ resp = self.ari_client.originate(endpoint, app)
+ self.__channel = resp.json()['id']
+ LOGGER.debug(msg + 'Response was [%r].' % resp)
+
+
+class BugsTestScenario(TestScenario):
+ """The 'Bugs' TestScenario.
+
+ This scenario tests for the case where a call is originated for an
+ app that was never registered in Stasis to determine if Stasis correctly
+ identifies this as a failure and assigns STASISSTATUS=FAILED.
+ """
+
+ def __init__(self, ami, ari_client, monitor, name):
+ """Constructor.
+
+ Keyword Arguments:
+ ami -- The AMI instance for this TestScenario.
+ ari_client -- The AriClient to use to for executing the
+ TestStrategy commands.
+ monitor -- The ChannelVariableMonitor instance for this
+ TestScenario.
+ name -- The name for this TestScenario instance.
+ """
+
+ super(BugsTestScenario, self).__init__(ami,
+ ari_client,
+ monitor,
+ 'FAILED',
+ name)
+
+ def run_strategy(self):
+ """Implements the run_strategy from the base class."""
+
+ msg = '{0} '.format(self)
+ LOGGER.debug('About to originate a channel with an app that was \
+ never registered in Stasis.')
+ app = self.name + 'alt'
+ endpoint = 'LOCAL/{0}@Acme'.format(app)
+ resp = self.ari_client.originate(endpoint, app)
+ self.monitor.start(resp.json()['id'])
+ LOGGER.debug(msg + 'Response was [%r].' % resp)
+
+
+class BusterTestScenario(TestScenario):
+ """The 'Buster' TestScenario.
+
+ This scenario tests for the case where a Stasis app that was
+ registered when channel A was originated, but is no longer registered
+ when channel B is originated, to determines if Stasis correctly identifies
+ this as a failure and assigns STASISSTATUS=FAILED.
+ """
+
+ def __init__(self, ami, ari_client, monitor, name):
+ """Constructor.
+
+ Keyword Arguments:
+ ami -- The AMI instance for this TestScenario.
+ ari_client -- The AriClient to use to for executing the
+ TestStrategy commands.
+ monitor -- The ChannelVariableMonitor instance for this
+ TestScenario.
+ name -- The name for this TestScenario instance.
+ """
+
+ super(BusterTestScenario, self).__init__(ami,
+ ari_client,
+ monitor,
+ 'FAILED',
+ name)
+
+ self.__ws_closed = False
+ self.ari_client.register_observers('on_ws_closed',
+ self.__on_ws_closed)
+
+ def __on_ws_closed(self, sender, message):
+ """Handles the AriClient 'on_ws_closed' event.
+
+ Keyword Arguments:
+ sender -- The object that raised the event.
+ message -- The event payload.
+ """
+
+ msg = '{0} '.format(self)
+
+ if self.suspended:
+ LOGGER.debug(msg + 'Scenario is suspended.')
+ return
+
+ if self.__ws_closed:
+ LOGGER.warn(msg + 'About to run duplicate scenario step.')
+
+ self.__ws_closed = True
+
+ msg = '{0} '.format(self)
+ LOGGER.debug(msg + 'In {0}; message={1}'.format('__on_ws_closed',
+ message))
+ LOGGER.debug(msg + 'About to originate a channel after the web socket \
+ has been disconnected.')
+ app = self.name
+ endpoint = 'LOCAL/{0}@Acme'.format(app)
+ resp = self.ari_client.originate(endpoint, app)
+ self.monitor.start(resp.json()['id'])
+ LOGGER.debug(msg + 'Response was [%r].' % resp)
+
+ def run_strategy(self):
+ """Implements the run_strategy from the base class."""
+
+ msg = '{0} '.format(self)
+ LOGGER.debug(msg + 'About to originate a channel with an app that has \
+ been registered in Stasis.')
+
+ app = self.name + 'alt'
+ endpoint = 'LOCAL/{0}@Acme'.format(app)
+ resp = self. ari_client.originate(endpoint, app)
+ LOGGER.debug(msg + 'Response was [%r].' % resp)
+ LOGGER.debug(msg + 'About to disconnect the web socket.')
+ self.ari_client.disconnect_websocket()
+
+
+def main():
+ """Entry point for the test.
+
+ Returns:
+ 0 if the test passed, 1 otherwise.
+ """
+
+ test = StasisStatusTestCase(build_scenarios)
+
+ if test.passed:
+ return 0
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
+
diff --git a/tests/rest_api/applications/stasisstatus/test-config.yaml b/tests/rest_api/applications/stasisstatus/test-config.yaml
new file mode 100644
index 0000000..1574959
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/test-config.yaml
@@ -0,0 +1,32 @@
+testinfo:
+ summary: |
+ Tests that Stasis correctly applies the STASISSTATUS channel
+ variable.
+ description: |
+ The test exercises Stasis in three ways to determine if is
+ functioning as advertised.
+ - For the first case (Babs), a channel is originated under normal
+ conditions and then the channel is hungup. For this case, the
+ test verifies that Stasis correctly assigns SUCCESS to STASISSTATUS.
+ - For the second case (Bugs), a channel is originated using an app
+ that was never registered with Stasis. The test verifies that Stasis
+ correctly assigns FAILED to STASISSTATUS.
+ - For the third case (Buster), a channel is made under normal
+ conditions, but, before the channel is hungup and while the channel is
+ still active, the websocket is disconnected. A channel is then
+ originated using the app that was just unregistered. For this case,
+ the test verifies that Stasis correctly assigns FAILED to STASISSTATUS.
+
+properties:
+ minversion: '13.4.0'
+ dependencies:
+ - python: 'autobahn.websocket'
+ - python: 'requests'
+ - python: 'twisted'
+ - python: 'starpy'
+ - asterisk: 'res_ari_applications'
+ - asterisk: 'res_ari_channels'
+ tags:
+ - ARI
+ issues:
+ - jira: 'ASTERISK-24802'
diff --git a/tests/rest_api/applications/stasisstatus/test_case.py b/tests/rest_api/applications/stasisstatus/test_case.py
new file mode 100644
index 0000000..1e8c1dd
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/test_case.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+import logging
+
+sys.path.append("lib/python")
+sys.path.append("tests/rest_api/applications")
+
+from asterisk.test_case import TestCase
+from twisted.internet import reactor, defer
+
+LOGGER = logging.getLogger(__name__)
+
+
+class StasisStatusTestCase(TestCase):
+ """The test case.
+
+ This class serves as a harness for the test scenarios. It manages the
+ life-cycle of the the objects needed to execute the test plan.
+ """
+
+ def __init__(self, scenario_builder):
+ """Constructor.
+
+ scenario_builder -- The builder to use for constructing
+ the test scenarios.
+
+ """
+
+ super(StasisStatusTestCase, self).__init__()
+
+ self.create_asterisk()
+
+ self.__host = self.ast[0].host
+ self.__port = 8088
+ self.__credentials = ('testsuite', 'testsuite')
+
+ self.__scenarios = list()
+ self.__iterator = None
+ self.__builder = scenario_builder
+
+ reactor.run()
+ return
+
+ def __format__(self, format_spec):
+ """Overrides default format handling for 'self'."""
+
+ return self.__class__.__name__ + ':'
+
+ def ami_connect(self, ami):
+ """Handler for the AMI connect event.
+
+ Keyword Arguments:
+ ami -- The AMI instance that raised this event.
+ """
+
+ super(StasisStatusTestCase, self).ami_connect(ami)
+ self.__initialize_scenarios(ami)
+
+ def __get_next_scenario(self):
+ """ Gets the next scenario from the list."""
+
+ scenario = None
+ try:
+ scenario = self.__iterator.next()
+ except StopIteration:
+ pass
+ return scenario
+
+ def __initialize_scenarios(self, ami):
+ """Initializes the scenarios.
+
+ Keyword Arguments:
+ ami -- The AMI instance for this test.
+ """
+
+ deferred = defer.Deferred()
+ self.__scenarios = self.__builder(ami,
+ self.__host,
+ self.__port,
+ self.__credentials)
+ self.__iterator = iter(self.__scenarios)
+
+ for scenario in self.__scenarios:
+ deferred.addCallback(self.__try_run_scenario)
+
+ deferred.callback(self.__get_next_scenario())
+
+ def on_reactor_timeout(self):
+ """Called when the reactor times out"""
+
+ LOGGER.warn("{0} Reactor is timing out. Setting test to FAILED.")
+ self.set_passed(False)
+
+ def __on_scenario_complete(self, sender, message):
+ """Queries the scenarios to determine if it is time to shut down
+ the test.
+
+ Keyword Arguments:
+ sender -- The object that raised the event.
+ message -- The event payload.
+ """
+
+ sender.stop()
+ for scenario in self.__scenarios:
+ if not scenario.finished:
+ return
+
+ LOGGER.debug('{0} Test case execution is complete.'.format(self))
+ self.stop_reactor()
+
+ def run(self):
+ """Executes the test case.
+
+ Tries to set up the state needed by the test. If successful, the test
+ is executed and then the test state is torn down."""
+
+ LOGGER.debug('{0} Starting test case execution.'.format(self))
+ super(StasisStatusTestCase, self).run()
+
+ self.create_ami_factory()
+
+ def stop_reactor(self):
+ """Clean up actions to perform prior to shutting down the reactor.
+
+ Queries the scenarios for their pass/fail state to determine
+ overall pass/fail state for the test. Then, destroys the test state
+ before stopping the reactor."""
+
+ LOGGER.debug('{0} Stopping reactor.'.format(self))
+ for scenario in self.__scenarios:
+ self.set_passed(scenario.passed)
+ if not scenario.clean:
+ scenario.stop()
+ super(StasisStatusTestCase, self).stop_reactor()
+ LOGGER.debug('{0} Reactor stopped.'.format(self))
+
+ def __try_run_scenario(self, scenario):
+ """Starts the stasis scenario.
+
+ Keyword Arguments:
+ scenario -- The scenario to try to start.
+
+ Returns:
+ If the self.__iterator has not yet finished traversing the list,
+ returns the next scenario in self.__scenarios.
+
+ Otherwise,returns None.
+ """
+
+ msg = '{0} {1} scenario [{2}]'
+
+ if scenario is not None:
+ LOGGER.debug((msg + '.').format(self,
+ 'Starting',
+ scenario.name))
+ scenario.start(self.__on_scenario_complete)
+ return self.__get_next_scenario()
+
+ msg = msg + '; {3}.'
+ LOGGER.warn(msg.format(self,
+ 'Cannot connect',
+ None,
+ 'scenario has not been assigned a value.'))
+ return None
diff --git a/tests/rest_api/applications/stasisstatus/test_scenario.py b/tests/rest_api/applications/stasisstatus/test_scenario.py
new file mode 100644
index 0000000..57e16d4
--- /dev/null
+++ b/tests/rest_api/applications/stasisstatus/test_scenario.py
@@ -0,0 +1,265 @@
+#!/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+
+import sys
+import logging
+
+sys.path.append("lib/python")
+sys.path.append("tests/rest_api/applications")
+
+from abc import ABCMeta, abstractmethod
+from stasisstatus.observable_object import ObservableObject
+
+LOGGER = logging.getLogger(__name__)
+
+
+class TestScenario(ObservableObject):
+ """The test scenario.
+
+ This class is responsbile for presenting a facade around the
+ AriClient and TestStrategy objects.
+ """
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, ami, ari_client, monitor, expected, name='testsuite'):
+ """Constructor.
+
+ Keyword Arguments:
+ ami -- The AMI instance for this TestScenario.
+ ari_client -- The AriClient to use to for executing the
+ TestStrategy commands.
+ monitor -- The ChannelVariableMonitor instance for this
+ TestScenario.
+ expected -- The expected value for this TestScenario.
+ name -- The name for this TestScenario instance
+ (optional) (default 'testsuite').
+ """
+
+ super(TestScenario, self).__init__(name, ['on_complete',
+ 'on_stop'])
+ self.__ami = ami
+ self.__ari_client = ari_client
+ self.__actual_value = None
+ self.__expected_value = expected
+ self.__monitor = monitor
+ self.__passed = None
+
+ self.__monitor.suspend()
+ self.ari_client.register_observers('on_client_start',
+ self.on_ari_client_start)
+ self.ari_client.register_observers('on_client_stop',
+ self.on_ari_client_stop)
+ self.monitor.register_observers('on_value_changed',
+ self.on_monitor_on_value_changed)
+
+ def compile_results(self):
+ """Compiles the results after executing the test strategy."""
+
+ if self.finished:
+ return
+
+ LOGGER.debug('{0} Compiling the results.'.format(self))
+
+ passed = self.actual_value == self.expected_value
+
+ LOGGER.debug('{0} Test strategy is complete.'.format(self))
+ LOGGER.debug('{0} Test values: Expected [{1}]; Actual [{2}].' \
+ .format(self, self.expected_value, self.actual_value))
+ LOGGER.debug('{0} Test results: Test {1}.' \
+ .format(self, 'Passed' if passed else 'Did Not Pass'))
+
+ self.passed = passed
+
+ def finish_scenario(self):
+ """Performs the final tasks."""
+
+ if self.finished:
+ LOGGER.debug('{0} Scenario is already finished.'.format(self))
+
+ self.suspend()
+ LOGGER.debug('{0} Finishing the scenario.'.format(self))
+ self.compile_results()
+
+ def on_ari_client_start(self, sender, message):
+ """Handles the AriClient on_client_start event.
+
+ Keyword Arguments:
+ sender -- The object that raised the event.
+ message -- The event payload.
+ """
+
+ LOGGER.debug('{0} AriClient started successfully.'.format(self))
+ if not self.suspended:
+ LOGGER.debug('{0} Running scenario.'.format(self))
+ self.run_strategy()
+
+ def on_ari_client_stop(self, sender, message):
+ """Handler for the AriClient on_client_stop event.
+
+ Keyword Arguments:
+ sender -- The object that raised the event.
+ message -- The event payload.
+ """
+
+ LOGGER.debug('{0} Scenario has stopped.'.format(self))
+ self.notify_observers('on_stop', None, True)
+
+ def on_monitor_on_value_changed(self, sender, message):
+ """Handles the ChannelVariableMonitor 'on_value_changed' event.
+
+ Keyword Arguments:
+ sender -- The object that raised the event.
+ message -- The event payload.
+ """
+
+ msg = '{0} '.format(self)
+
+ if self.suspended:
+ LOGGER.debug(msg + 'Scenario is suspended.')
+ return
+
+ self.actual_value = self.monitor.captured_value
+
+ if self.actual_value == self.expected_value:
+ self.suspend()
+ LOGGER.debug('{0} Looks like we made it.'.format(self))
+ self.finish_scenario()
+
+ def resume(self):
+ """Overrides the default behavior of resetting the value of the
+ suspended flag."""
+
+ if not self.suspended:
+ return
+
+ super(TestScenario, self).resume()
+ self.__monitor.resume()
+
+ @abstractmethod
+ def run_strategy(self):
+ """Runs the Test Scenario."""
+
+ return
+
+ def start(self, on_scenario_complete=None):
+ """Starts the test scenario.
+
+ Keyword Arguments:
+ on_scenario_complete -- A callback (or a list of callbacks) to invoke
+ after the scenario completes (optional)
+ (default None).
+ """
+
+ LOGGER.debug('{0} Starting scenario.'.format(self))
+ self.register_observers('on_complete', on_scenario_complete)
+ self.ari_client.start()
+
+ def stop(self, on_scenario_stop=None):
+ """Stops the scenario execution and tears down its state.
+
+ Keyword Arguments:
+ on_scenario_stop -- A callback (or a list of callbacks) to invoke
+ after the scenario stops (optional)
+ (default None).
+ """
+
+ if self.ari_client.suspended:
+ return
+
+ LOGGER.debug('{0} Stopping the scenario.'.format(self))
+ self.register_observers('on_stop', on_scenario_stop)
+ self.suspend()
+ self.ari_client.stop()
+
+ def suspend(self):
+ """Overrides the default behavior of setting the value of the
+ suspended flag."""
+
+ if self.suspended:
+ return
+
+ super(TestScenario, self).suspend()
+ self.__monitor.suspend()
+
+ @property
+ def actual_value(self):
+ """The actual value for this TestScenario."""
+
+ return self.__actual_value
+
+ @actual_value.setter
+ def actual_value(self, value):
+ """Sets the actual value for this TestScenario."""
+
+ self.__actual_value = value
+
+ @property
+ def ami(self):
+ """The AMI instance for this TestScenario."""
+
+ return self.__ami
+
+ @property
+ def ari_client(self):
+ """The AriClient instance for this TestScenario."""
+
+ return self.__ari_client
+
+ @property
+ def clean(self):
+ """Flag indicating that this scenario has been torn down."""
+
+ return self.ari_client.clean
+
+ @property
+ def expected_value(self):
+ """The expected value for this TestScenario."""
+
+ return self.__expected_value
+
+ @property
+ def finished(self):
+ """Whether or not the strategy for this scenario has completed
+ execution.
+
+ Returns:
+ True if the strategy has completed execution, False otherwise.
+ """
+
+ return self.__passed is not None
+
+ @property
+ def monitor(self):
+ """The ChannelVariableMonitor instance."""
+
+ return self.__monitor
+
+ @property
+ def passed(self):
+ """The state of the strategy.
+
+ Returns:
+ None if the test strategy has not completed. Else, True if the test
+ strategy was successful, False otherwise.
+ """
+
+ return False if not self.finished else self.__passed
+
+ @passed.setter
+ def passed(self, value):
+ """Safely set the passed variable for this scenario."""
+
+ if self.__passed is False:
+ return
+
+ self.__passed = value
+ self.notify_observers('on_complete', None, True)
+ return
diff --git a/tests/rest_api/applications/tests.yaml b/tests/rest_api/applications/tests.yaml
index 3a9c104..5d5c4be 100644
--- a/tests/rest_api/applications/tests.yaml
+++ b/tests/rest_api/applications/tests.yaml
@@ -6,3 +6,4 @@
- test: 'subscribe-device-state'
- test: 'double-subscribe-device-state'
- dir: 'channel-subscriptions'
+ - test: 'stasisstatus'
--
To view, visit https://gerrit.asterisk.org/18
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I0f7dadfd429bd30e9f07a531f47884d8c923fc13
Gerrit-PatchSet: 10
Gerrit-Project: testsuite
Gerrit-Branch: master
Gerrit-Owner: Ashley Sanders <asanders at digium.com>
Gerrit-Reviewer: Ashley Sanders <asanders at digium.com>
Gerrit-Reviewer: Mark Michelson <mmichelson at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
More information about the asterisk-commits
mailing list