[asterisk-commits] Testsuite: Refactored 'rls validation' Module for Easier Deb... (testsuite[master])
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Nov 12 10:54:44 CST 2015
Anonymous Coward #1000019 has submitted this change and it was merged.
Change subject: Testsuite: Refactored 'rls_validation' Module for Easier Debugging
......................................................................
Testsuite: Refactored 'rls_validation' Module for Easier Debugging
While this does not actually correct the issue with the test failure (which
incidentally turns out is not localized to this test), this patch modifies the
the test validator by separating the concerns for each validation method,
improving the logging, and fixing pylint issues.
This is part four (4) of an n-patch series of refactorings to help determine the
root cause of the test failure and correct pylint issues.
ASTERISK-25430
Reported By: Ashley Sanders
Change-Id: Icb8af439f5c3db3efb0a93d6402ab4dcacdf3731
---
A tests/channels/pjsip/subscriptions/rls/rls_element.py
D tests/channels/pjsip/subscriptions/rls/rls_integrity.py
M tests/channels/pjsip/subscriptions/rls/rls_test.py
A tests/channels/pjsip/subscriptions/rls/rls_validation.py
4 files changed, 1,314 insertions(+), 437 deletions(-)
Approvals:
Anonymous Coward #1000019: Verified
Matt Jordan: Looks good to me, approved
Joshua Colp: Looks good to me, but someone else must approve
diff --git a/tests/channels/pjsip/subscriptions/rls/rls_element.py b/tests/channels/pjsip/subscriptions/rls/rls_element.py
new file mode 100644
index 0000000..862d4c3
--- /dev/null
+++ b/tests/channels/pjsip/subscriptions/rls/rls_element.py
@@ -0,0 +1,1129 @@
+#/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Jonathan Rose <jrose at digium.com>
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+import logging
+import xml.etree.ElementTree as ET
+
+sys.path.append('lib/python')
+sys.path.append('tests/channels/pjsip/subscriptions/rls')
+
+from abc import ABCMeta, abstractmethod
+from rls_validation import ValidationInfo
+
+LOGGER = logging.getLogger(__name__)
+
+
+class RLSElement(object):
+ """Base class for an RLS element."""
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, data):
+ """Constructor.
+
+ keyword Arguments:
+ data -- The raw element from the packet message body.
+ """
+
+ self.data = data
+ self.children = []
+ self.content_id = {}
+ self.rlmi_content_id = {}
+
+ def handle_error(self, reason=None):
+ """Handler for validation errors.
+
+ Keyword Arguments"
+ reason -- The failure reason. (Optional.)
+
+ Returns:
+ False.
+ """
+
+ self.reset()
+ if reason is not None:
+ LOGGER.error(reason)
+ return False
+
+ def count_parts(self, packet_type):
+ """Count the number of child elements of a given type.
+
+ Keyword Arguments:
+ packet_type -- The type of element to query.
+
+ Returns:
+ The count of elements matching the provided packet type.
+ """
+
+ parts = self.data.body.parts
+ return sum([1 for x in parts if x.body.packet_type == packet_type])
+
+ def get_rlmi(self):
+ """Helper method to get the RLMI part for this RLSElement instance.
+
+ Returns:
+ The RLMI part of the message, if successful. Otherwise, returns None.
+ """
+
+ for part in self.data.body.parts:
+ if part.body.packet_type == "RLMI":
+ return part
+ return None
+
+ def validate_element(self, element, info):
+ """Validates a single body element with its appropriate validator.
+
+ Keyword Arguments:
+ element -- The packet body element to validate.
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the component matched expectations. Otherwise, returns False.
+ """
+
+ rls_element = None
+ rls_type = element.body.packet_type
+
+ if rls_type == "RLMI":
+ rls_element = RMLI(element.body.list_elem)
+ elif rls_type == "PIDF":
+ rls_element = PIDF(element.body)
+ elif rls_type == "MWI":
+ rls_element = MWI(element.body)
+ elif rls_type == "Multipart":
+ rls_element = Multipart(element)
+ else:
+ message = (
+ "Validation failed. Received unrecognized body part packet "
+ "type of '{0}'.").format(rls_type)
+ return self.handle_error(message)
+
+ if not rls_element.validate(info):
+ return False
+
+ self.children.append(rls_element)
+ return True
+
+ def reset(self):
+ """Resets the state of this RLSElement instance."""
+
+ self.content_id = {}
+ self.children = []
+
+ @abstractmethod
+ def validate(self, info):
+ """Validates the integrity of this RLSElement instance.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if this Multipart instance is valid. Otherwise, returns False.
+ """
+
+ pass
+
+ @property
+ def resource_cids(self):
+ """The Content-ID elements for this RLSElement instance."""
+
+ content_ids = self.content_id
+ for child in self.children:
+ content_ids.update(child.resource_cids)
+ return content_ids
+
+ @property
+ def rlmi_cids(self):
+ """The RLMI Content-ID elements for this RLSElement instance."""
+
+ rlmi_cids = self.rlmi_content_id
+ for child in self.children:
+ rlmi_cids.update(child.rlmi_cids)
+ return rlmi_cids
+
+class Multipart(RLSElement):
+ """General class that validates a multipart body of an RLS packet."""
+
+ def __init__(self, multi_part):
+ """Constructor.
+
+ Keyword Arguments:
+ multi_part -- The Multipart part from a multipart body.
+ """
+
+ super(Multipart, self).__init__(multi_part)
+
+ def validate(self, info):
+ """Validates the integrity of a Multipart body.
+
+ This filters down through parts within the multipart body
+ in order to recursively evaluate each element contained
+ within.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if this Multipart instance is valid. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Validating multipart body...")
+
+ content_id = self.data.content_id
+ rlmi = self.get_rlmi()
+
+ if not self.__validate_content_id(content_id):
+ return False
+
+ if not self.__validate_rlmi_element(rlmi):
+ return False
+
+ self.content_id[rlmi.body.list_elem.uri] = content_id
+ name = rlmi.body.list_elem.name[0].valueOf_
+ next_resource = info.resources.get(name)
+ resources = next_resource["sublist"]
+
+ if not self.__validate_next_resource(next_resource, name):
+ return False
+
+ return self.__validate_children(info, resources, rlmi, name)
+
+ def __validate_children(self, info, resources, rlmi, name):
+ """Validates the children elements of a Multipart body.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+ resources -- The 'sublist' element of the next_resource
+ for the Multipart body.
+ rlmi -- The raw RLMI section of the Multipart body.
+ name -- The name of the next resource.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ multipart_info = ValidationInfo(resources,
+ info.version,
+ info.fullstate,
+ rlmi,
+ name)
+
+ for part in self.data.body.parts:
+ if not self.validate_element(part, multipart_info):
+ return self.handle_error()
+
+ return True
+
+ def __validate_content_id(self, content_id):
+ """Verifies the Multipart body contains exactly one Content-ID element.
+
+ Keyword Arguments:
+ content_id -- The raw Content-ID element.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing multipart body -- Inspecting "
+ "'Content-ID' element...")
+
+ if not content_id:
+ message = (
+ "Processing multipart body -- Validation check failed. "
+ "Multipart does not have a 'Content-ID' element.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing multipart body -- Validation check "
+ "passed. Received expected 'Content-ID' element.")
+
+ return True
+
+ def __validate_next_resource(self, next_resource, name):
+ """Verifies there is only one RLMI element of an Multipart body.
+
+ Keyword Arguments:
+ next_resource -- The next resource in the expectations list.
+ name -- The expected name of the next resource.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing multipart body -- Inspecting body for: "
+ "resource: ({0})...".format(name))
+
+ if not next_resource:
+ message = (
+ "Processing multipart body -- Validation check failed. "
+ "Received unexpected resource ({0}) in RLMI message "
+ "body.").format(name)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing multipart body -- Validation check "
+ "passed. Resource ({0}) is an expected "
+ "resource.".format(name))
+
+ # Verifying next resource type is 'Multipart'
+ LOGGER.debug("Processing multipart body -- Inspecting resource "
+ "({0}) type attribute. Expecting: "
+ "'Multipart'...".format(name))
+
+ next_type = next_resource["type"]
+ if next_type != "Multipart":
+ message = (
+ "Processing multipart body -- Validation check failed. "
+ "Expected packet type to be 'Multipart' but received "
+ "({0}).").format(next_type)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing multipart body -- Validation check "
+ "passed. Received expected packet type.".format(name))
+
+ return True
+
+ def __validate_rlmi_element(self, rlmi):
+ """Verifies there is only one RLMI element of an Multipart body.
+
+ Keyword Arguments:
+ rlmi -- The raw RLMI elements.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing multipart body -- Inspecting RLMI part...")
+
+ if rlmi is None:
+ message = (
+ "Processing multipart body -- Validation check failed. "
+ "Multipart does not contain expected RLMI part.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing multipart body -- Validation check "
+ "passed. Received expected RLMI part.")
+
+ return True
+
+class MWI(RLSElement):
+ """General class that validates an MWI body from an RLS packet."""
+
+ def __init__(self, mwi_body):
+ """Constructor.
+
+ Keyword Arguments:
+ mwi_body -- The MWI body from a multipart body.
+ """
+
+ super(MWI, self).__init__(mwi_body)
+
+ def validate(self, info):
+ """Validates the integrity of an MWI body.
+
+ This uses the RLMI that included the MWI body such that its
+ name and URI can be determined and linked to the appropriate
+ resource.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if this MWI instance is valid. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Validating MWI body ...")
+
+ if not self.__validate_content_id(self.data.content_id):
+ return False
+
+ res_name = None
+ res_uri = None
+
+ for resource in info.rlmi.body.list_elem.resource:
+ if resource.instance[0].cid == self.data.content_id:
+ res_name = resource.name[0].valueOf_
+ res_uri = resource.uri
+ break
+
+ if not self.__validate_rlmi_body(info):
+ return False
+ if not self.__validate_resource_instance(res_name, res_uri):
+ return False
+ if not self.__validate_mwi_resource(info, res_name):
+ return False
+
+ self.content_id[res_uri] = self.data.content_id
+ return True
+
+ def __validate_content_id(self, content_id):
+ """Verifies the MWI body contains exactly one Content-ID element.
+
+ Keyword Arguments:
+ content_id -- The raw Content-ID element.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing MWI body -- Inspecting 'Content-ID' "
+ "element...")
+
+ if not content_id:
+ message = (
+ "Processing MWI body -- Validation check failed. MWI body "
+ "does not contain a Content-ID.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing MWI body -- Validation check passed. MWI body"
+ "contains a Content-ID.")
+
+ return True
+
+ def __validate_mwi_resource(self, info, name):
+ """Validates the MWI resource of an MWI body.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+ name -- The name of the MWI resource instance.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing MWI body -- Inspecting MWI resource...")
+
+ relevant_resource = info.resources.get(name)
+
+ if not relevant_resource:
+ message = (
+ "Processing MWI body -- Validation check failed. MWI '{0}' "
+ "not specified in expected resources.").format(name)
+ return self.handle_error(message)
+
+ resource_type = relevant_resource["type"]
+
+ if resource_type != "MWI":
+ message = (
+ "Processing MWI body -- Validation check failed. Resource "
+ "type ({0}) isn't an MWI type.").format(resource_type)
+ return self.handle_error(message)
+
+ actual_vm = relevant_resource["voice_message"]
+ expected_vm = self.data.voice_message
+
+ if actual_vm != expected_vm:
+ message = (
+ "Processing MWI body -- Validation check failed. Received "
+ "Voice-Message header ({0}) doesn't match expected "
+ "({1}).").format(actual_vm, expected_vm)
+ return self.handle_error(message)
+
+ actual_msgs_waiting = relevant_resource["messages_waiting"]
+ expected_msgs_waiting = self.data.messages_waiting
+
+ if actual_msgs_waiting != expected_msgs_waiting:
+ message = (
+ "Processing MWI body -- Validation check failed. Received "
+ "Messages-Waiting header ({0}) doesn't match expected "
+ "({1}).").format(actual_msgs_waiting, expected_msgs_waiting)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing MWI body -- Validation check passed for MWI "
+ "resource ({0}).".format(relevant_resource))
+
+ return True
+
+ def __validate_resource_instance(self, name, uri):
+ """Validates the RLMI body has a resource instance for this MWI body.
+
+ Keyword Arguments:
+ name -- The name of the MWI resource instance.
+ uri -- The URI of the MWI resource instance.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing MWI body -- Inspecting RLMI body resource "
+ "instances...")
+
+ if not name:
+ message = (
+ "Processing MWI body -- Validation check failed. "
+ "Couldn't find MWI body with Content ID '{0}' in "
+ "RLMI body.").format(self.data.content_id)
+ return self.handle_error(message)
+
+ if not uri:
+ message = (
+ "Processing MWI body -- Validation check failed. URI not "
+ "found for resource '{0}' in RLMI body.").format(name)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing MWI body -- Validation check passed. RLMI "
+ "body contains a resource instance for this MWI body.")
+
+ return True
+
+ def __validate_rlmi_body(self, info):
+ """Validates the MWI body contains an RLMI body.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing MWI body -- Inspecting RLMI body...")
+
+ if not info.rlmi:
+ message = (
+ "Processing MWI body -- Validation check failed. MWI "
+ "part does not contain an RLMI body.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing MWI body -- Validation check passed. MWI "
+ "part contains an RLMI body.")
+
+ return True
+
+class PIDF(RLSElement):
+ """General class that validates the PIDF body of an RLS packet."""
+
+ def __init__(self, pidf_body):
+ """Constructor.
+
+ Keyword Arguments:
+ pidf_body -- The PIDF body from a multipart body.
+ """
+
+ super(PIDF, self).__init__(pidf_body)
+
+ def validate(self, info):
+ """Validates the integrity of a PIDF body.
+
+ This uses XML ElementTree to parse the PIDF body and ensures basic
+ structural elements (as they relate to RLS) are present.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if this PIDF instance is valid. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Validating PIDF body ...")
+
+ if not self.__validate_content_id(self.data.content_id):
+ return False
+
+ uri = self.__try_parse_uri()
+ if not uri:
+ return False
+
+ self.content_id[uri] = self.data.content_id
+ return True
+
+ def __validate_content_id(self, content_id):
+ """Verifies the PIDF body contains exactly one Content-ID element.
+
+ Keyword Arguments:
+ content_id -- The raw Content-ID element.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing PIDF body -- Inspecting 'Content-ID' "
+ "element...")
+
+ if not content_id:
+ message = (
+ "Processing PIDF body -- Validation check failed. "
+ "PIDF body does not contain a Content-ID.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing PIDF body -- Validation check passed. "
+ "PIDF body contains a Content-ID.")
+
+ return True
+
+ def __try_parse_uri(self):
+ """Tries to parse the URI from the PIDF XML.
+
+ Returns:
+ The URI if successful. Otherwise, returns False.
+ """
+
+ try:
+ root = ET.fromstring(self.data.xml)
+ except Exception as ex:
+ message = (
+ "Processing PIDF body -- Validation check failed."
+ "Exception when parsing PIDF XML: {0}.").format(ex)
+ return self.handle_error(message)
+
+ entity = root.get("entity")
+ if not entity:
+ message = (
+ "Processing PIDF body -- Validation check failed. "
+ "PIDF document root has no entity element.")
+ return self.handle_error(message)
+
+ return entity.strip("<>")
+
+class RMLI(RLSElement):
+ """General class that validates the RLMI part of an RLS packet."""
+
+ def __init__(self, list_elem):
+ """Constructor.
+
+ Keyword Arguments:
+ list_elem -- The XML <list> element in the RLMI body, as
+ parsed by lxml.
+
+
+ """
+
+ super(RMLI, self).__init__(list_elem)
+
+ def validate(self, info):
+ """Validates an RLMI document.
+
+ This method checks the integrity of the list element and calls
+ into a helper method to check the integrity of each resource
+ element in the list.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if this RMLI instance is valid. Otherwise, returns
+ False.
+ """
+
+ LOGGER.debug("Validating RLMI list element...")
+
+ #if not self.__validate_version(info):
+ # return False
+
+ if not self.__validate_fullstate(info):
+ return False
+
+ if not self.__validate_name():
+ return False
+
+ if not self.__validate_resources(info):
+ return False
+
+ if not self.__validate_rlmi_name(info):
+ return False
+
+ return self.__validate_rlmi_resources(info)
+
+ def __validate_fullstate(self, info):
+ """Verifies the RLMI body contains the expected fullstate value.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI list element -- Inspecting RLMI "
+ "fullstate value. Expecting: "
+ "{0}...".format(info.fullstate))
+
+ list_fullstate = self.data.get_fullState()
+
+ if list_fullstate != info.fullstate:
+ message = (
+ "Processing RLMI list element -- Validation "
+ "check failed. Received unexpected RLMI fullState "
+ "value ({0}).").format(list_fullstate)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI list element -- Validation check "
+ "passed. Received expected RLMI fullstate value.")
+ return True
+
+ def __validate_name(self):
+ """Verifies the RLMI body contains exactly one name element.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI list element -- Inspecting number "
+ "of 'name' elements. Expecting: 1.")
+
+ list_name = self.data.get_name()
+
+ if len(list_name) != 1:
+ message = (
+ "Processing RLMI list element -- Validation "
+ "check failed. Received unexpected number of "
+ "'name' elements ({0}).").format(len(list_name))
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI list element -- Validation check "
+ "passed. Received expected number of 'name' elements.")
+
+ return True
+
+ def __validate_resources(self, info):
+ """Verifies the RLMI body contains the expected number of resources.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI list element -- Inspecting number "
+ "of 'resource' elements. Expecting: "
+ "{0}.".format(len(info.resources)))
+
+ list_resources = self.data.resource
+ res_count = len(list_resources)
+
+ if res_count != len(info.resources):
+ message = (
+ "Processing RLMI list element -- Validation "
+ "check failed. Received unexpected number of "
+ "'resource' elements ({0}).").format(res_count)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI list element -- Validation check "
+ "passed. Received expected number of 'resource' "
+ "elements.")
+
+ return True
+
+ def __validate_rlmi_resources(self, info):
+ """Validates each RLMI resources from the RLMI body.
+
+ This method checks the integrity of the each RLMI resource element in
+ the RLMI list.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI list element -- Validating children "
+ "RLMI resource elements.")
+
+ for resource in self.data.resource:
+ rlmi_resource = RLMIResource(resource)
+ if not rlmi_resource.validate(info):
+ return self.handle_error()
+ self.children.append(rlmi_resource)
+ return True
+
+ def __validate_rlmi_name(self, info):
+ """Verifies the RLMI body contains exactly one Content-ID element.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI list element -- Inspecting RLMI list "
+ "name. Expecting: {0}.".format(info.rlmi_name))
+
+ list_name = self.data.get_name()
+
+ if list_name[0].get_valueOf_() != info.rlmi_name:
+ message = (
+ "Processing RLMI list element -- Validation check "
+ "failed. Received unexpected RLMI list name "
+ "({0}).").format(self.data.name[0].value())
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI list element -- Validation check "
+ "passed. Received expected RLMI list name.")
+
+ return True
+
+ def __validate_version(self, info):
+ """Verifies the RLMI body contains exactly one Content-ID element.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI list element -- Inspecting RLMI "
+ "version. Expecting: {0}...".format(info.version))
+
+ list_version = self.data.get_version()
+
+ if list_version != info.version:
+ message = (
+ "Processing RLMI list element -- Validation check failed. "
+ "Received unexpected RLMI version ({0}).").format(list_version)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI list element -- Validation check "
+ "passed. Received expected RLMI version.")
+
+ return True
+
+class RLMIResource(RLSElement):
+ """General class that validates an RLMI resource."""
+
+ def __init__(self, rlmi_resource):
+ """Constructor.
+
+ Keyword Arguments:
+ rlmi_resource -- The XML <resource> element in the RLMI
+ <list>, as parsed by lxml.
+ """
+
+ super(RLMIResource, self).__init__(rlmi_resource)
+
+ def validate(self, info):
+ """Validate an RLMI resource.
+
+ This method checks the integrity of a resource XML element within an
+ RLMI list.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Validating RLMI resource...")
+
+ if not self.__validate_uri():
+ return False
+
+ if not self.__validate_name(info):
+ return False
+
+ if not self.__validate_instance(info):
+ return False
+
+ self.rlmi_content_id[self.data.uri] = self.data.instance[0].cid
+ return True
+
+ def __validate_instance(self, info):
+ """Validates the instance attribute of this RLMI resource.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI resource -- Inspecting number of "
+ "'instance' elements. Expecting: 1.")
+
+ instances = len(self.data.instance)
+ if instances != 1:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. Received unexpected number of 'instance' "
+ "elements ({0}) in RLMI resource.").format(instances)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI resource -- Validation check passed. "
+ "Received expected number of 'instance' elements.")
+
+ # Validate RLMI resource instance attribute
+ LOGGER.debug("Processing RLMI resource -- Inspecting RLMI resource "
+ "instance attributes: state, id, and cid...")
+
+ resource_instance = self.data.instance[0]
+
+ if not resource_instance.state:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. Resource instance has no state.")
+ return self.handle_error(message)
+ if not resource_instance.id:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. Resource instance has no id.")
+ return self.handle_error(message)
+ if not resource_instance.cid:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. Resource instance has no cid.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI resource -- Validation check passed. "
+ "Received values for 'instance' element attributes: "
+ "state, id and cid.")
+
+ # Validate the instance state matches the expected value
+ name = self.data.get_name()[0].get_valueOf_()
+ state = info.resources[name]["state"]
+ LOGGER.debug("Processing RLMI resource -- Inspecting instance "
+ "state. Expecting: {0}...".format(state))
+
+ if resource_instance.state != state:
+ message = (
+ "Processing RLMI resource -- Validation check failed. "
+ "Received unexpected instance state "
+ "({0})).").format(resource_instance.state)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI resource -- Validation check passed. "
+ "Received expected value for the RLMI instance state.")
+
+ return True
+
+ def __validate_name(self, info):
+ """Validates the name for this RLMI resource.
+
+ The method first ensures the RLMI resource contains only one name
+ element. Then, inspects the value of the name element to ensure it
+ was an expected resource.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI resource -- Inspecting number of "
+ "'name' elements. Expecting: 1.")
+
+ resource_name = self.data.get_name()
+
+ if len(resource_name) != 1:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. Received unexpected number of 'name' "
+ "elements ({0}).").format(len(resource_name))
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI resource -- Validation check "
+ "passed. Received expected number of 'name' elements.")
+
+ LOGGER.debug("Processing RLMI resource -- Inspecting RLMI "
+ "resource name...")
+
+ name = resource_name[0].get_valueOf_()
+
+ if name not in info.resources:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. Received unexpected RLMI resource name "
+ "({0}).").format(name)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI list element -- Validation check "
+ "passed. Received expected RLMI resource name.")
+
+ return True
+
+ def __validate_uri(self):
+ """Validates the URI attribute of this RLMI resource.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLMI resource -- Inspecting 'uri' "
+ "attribute... ")
+
+ if not self.data.uri:
+ message = (
+ "Processing RLMI resource -- Validation check "
+ "failed. RLMI resource is missing its 'uri' attribute.")
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLMI resource -- Validation check "
+ "passed. Received expected 'uri' attribute.")
+ return True
+
+class RLSPacket(RLSElement):
+ """General class that validates a multipart RLS NOTIFY body."""
+
+ def __init__(self, packet):
+ """Constructor.
+
+ Keyword Arguments:
+ packet -- The Multipart NOTIFY body in full.
+ """
+
+ super(RLSPacket, self).__init__(packet)
+
+ def validate(self, info):
+ """Validates a multipart RLS packet.
+
+ If the multipart body does not pass validation, then the test will
+ fail. If this method returns at all, it means that the body passed
+ validation.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the multipart RLS packet matches the expectations provided at
+ construction. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Validating RLS packet...")
+
+ if not self.__validate_part_counts(info):
+ return False
+
+ if not self.__validate_children(info):
+ return False
+
+ return self.__validate_content_ids()
+
+ def __validate_part_counts(self, info):
+ """Validates the body and RLMI part counts of an RLS packet.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ LOGGER.debug("Processing RLS packet -- Inspecting number of "
+ "body parts received. Expecting: "
+ "{0}...".format(len(info.resources)))
+
+ body_parts = len(self.data.body.parts)
+
+ if body_parts != len(info.resources) + 1:
+ message = (
+ "Processing RLS packet -- Validation check failed. "
+ "Received unexpected number of parts ({0}) in "
+ "multipart body.").format(body_parts - 1)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLS packet -- Validation check passed. "
+ "Received expected number of body parts.")
+
+ # Verify there is exactly 1 RLMI part
+ LOGGER.debug("Processing RLS packet -- Inspecting number of RMLI "
+ "parts received. Expecting: 1...")
+
+ rlmi_parts = self.count_parts("RLMI")
+
+ if rlmi_parts != 1:
+ message = (
+ "Processing RLS packet -- Validation check failed. "
+ "Received unexpected number of RLMI parts ({0}) in "
+ "multipart body.").format(rlmi_parts)
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLS packet -- Validation check passed. "
+ "Received expected number of RMLI parts.")
+
+ return True
+
+ def __validate_children(self, info):
+ """Validates the children elements of an RLS packet.
+
+ Keyword Arguments:
+ info -- The ValidationInfo instance containing data
+ for this validation session.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ rlmi = self.get_rlmi()
+ rls_info = ValidationInfo(info.resources,
+ info.version,
+ info.fullstate,
+ rlmi,
+ info.rlmi_name)
+
+ for part in self.data.body.parts:
+ if not self.validate_element(part, rls_info):
+ return self.handle_error()
+
+ return True
+
+ def __validate_content_ids(self):
+ """Validates the Content-IDs of an RLS packet.
+
+ Returns:
+ True if the validation is successful. Otherwise, returns False.
+ """
+
+ resource_cids = self.resource_cids
+ rlmi_cids = self.rlmi_cids
+
+ LOGGER.debug("Processing RLS packet -- Inspecting number of "
+ "Content-IDs received. Expecting: "
+ "{0}...".format(len(resource_cids)))
+
+ if len(rlmi_cids) != len(resource_cids):
+ message = (
+ "Processing RLS packet -- Validation check failed. "
+ "Gathered mismatching number of Content IDs. RLMI "
+ "document has {0} Content IDs but was expected to "
+ "have {1}.").format(len(rlmi_cids),
+ len(resource_cids))
+ return self.handle_error(message)
+
+ LOGGER.debug("Processing RLS packet -- Validation check passed. "
+ "Received expected number of Content IDs.")
+
+ for uri, cid in rlmi_cids.iteritems():
+ if uri not in resource_cids:
+ message = (
+ "Processing RLS packet -- Validation check "
+ "failed. URI {0} not found within the RLS packet "
+ "Content-ID elements.").format(uri)
+ return self.handle_error(message)
+
+ resource_cid = resource_cids.get(uri)
+ if resource_cid != cid:
+ message = (
+ "Processing RLS packet -- Validation check failed. "
+ "Mismatching Content-ID for URI ({0}). RLMI document has "
+ "({1}). Document has ({2})").format(uri,
+ cid,
+ resource_cid)
+ return self.handle_error(message)
+ return True
diff --git a/tests/channels/pjsip/subscriptions/rls/rls_integrity.py b/tests/channels/pjsip/subscriptions/rls/rls_integrity.py
deleted file mode 100755
index 5d44f05..0000000
--- a/tests/channels/pjsip/subscriptions/rls/rls_integrity.py
+++ /dev/null
@@ -1,381 +0,0 @@
-#/usr/bin/env python
-"""
-Copyright (C) 2015, Digium, Inc.
-Jonathan Rose <jrose at digium.com>
-
-This program is free software, distributed under the terms of
-the GNU General Public License Version 2.
-"""
-
-import logging
-import xml.etree.ElementTree as ET
-
-LOGGER = logging.getLogger(__name__)
-
-
-def count_parts(parts, packet_type):
- """Count the number of parts of a particular type in a multipart body"""
- return sum([1 for x in parts if x.body.packet_type == packet_type])
-
-
-class RLSValidator(object):
- """General class that validates a multipart RLS NOTIFY body"""
- def __init__(self, test_object, packet, version, full_state, list_name,
- resources):
- """Constructor
-
- Arguments:
- test_object The test object for the running test.
- packet The Multipart NOTIFY body in full.
- version The expected RLMI version attribute. Expressed as an integer.
- full_state The expected RLMI fullState attribute. Expressed as a
- boolean.
- list_name The expected RLMI name element value.
- packet_type The type of body parts to expect other than RLMI.
- resources A dictionary of the resource names and their expected state.
- """
- super(RLSValidator, self).__init__()
- self.test_object = test_object
- self.packet = packet
- self.version = version
- self.full_state = full_state
- self.list_name = list_name
- self.resources = resources
- self.rlmi_cids = {}
- self.resource_cids = {}
-
- def validate_body_part(self, part, resources, rlmi, list_name):
- """Validates a single body part against the its packet type validator
-
- Note: Will mark the test as failed if the packet does not match
- expectations
-
- Returns:
- True if the component matched expectations
- False if the component did not match expectations
- """
- if part.body.packet_type == 'RLMI':
- return self.validate_rlmi(part.body.list_elem, resources,
- list_name)
- elif part.body.packet_type == 'PIDF':
- return self.validate_pidf(part.body, resources)
- elif part.body.packet_type == 'MWI':
- return self.validate_mwi(part.body, resources, rlmi)
- elif part.body.packet_type == 'Multipart':
- return self.validate_multipart(part, resources)
-
- msg = "Unrecognized body part packet type of '{0}'"
- self.fail_test(msg.format(part.body.packet_type))
- return False
-
- def check_integrity(self):
- """Validates a multipart RLS body
-
- If the multipart body does not pass validation, then the test will
- fail. If this method returns at all, it means that the body passed
- validation.
-
- Returns:
- True if the multipart RLS body matches the expectations provided at
- construction
- False if the multipart RLS body does not match the expectations
- """
- rlmi = None
-
- # Number of resources plus an RLMI part
- if len(self.packet.body.parts) != len(self.resources) + 1:
- self.fail_test("Unexpected number of parts (%d) in multipart body"
- % len(self.packet.body.parts))
- return False
-
- rlmi_parts = count_parts(self.packet.body.parts, 'RLMI')
- resource_parts = len(self.packet.body.parts) - 1
-
- if rlmi_parts != 1:
- self.fail_test("Unexpected number of RLMI parts (%d) in multipart"
- "body" % rlmi_parts)
- return False
-
- if resource_parts != len(self.resources):
- self.fail_test("Unexpected number of parts (%d) in multipart"
- "body" % resource_parts)
- return False
-
- for part in self.packet.body.parts:
- if part.body.packet_type == 'RLMI':
- rlmi = part
- break
-
- for part in self.packet.body.parts:
- x = self.validate_body_part(part, self.resources, rlmi,
- self.list_name)
- if not x:
- return False
-
- if len(self.rlmi_cids) != len(self.resource_cids):
- self.fail_test("Gathered mismatching number of Content IDs. RLMI"
- "document has %d. Should have %d" %
- (len(self.rlmi_cids), len(self.resource_cids)))
- return False
-
- for uri, cid in self.rlmi_cids.iteritems():
- if uri not in self.resource_cids:
- self.fail_test("URI not found in %s documents" %
- (uri))
- return False
- if self.resource_cids.get(uri) != cid:
- self.fail_test("Mismatching Content ID for URI %s. RLMI"
- "document has %s. Document has %s" %
- (uri, cid, self.resource_cids.get(uri)))
- return False
-
- return True
-
- def validate_rlmi(self, list_elem, resources, list_name):
- """Validate an RLMI document
-
- This method checks the integrity of the list element and calls
- into a helper method to check the integrity of each resource
- element in the list.
-
- Arguments:
- list_elem The XML <list> element in the RLMI body, as parsed by lxml
- resources The expected resources dictionary relevant to this RLMI body
- """
-
- if list_elem.get_version() != self.version:
- self.fail_test("Unexpected RLMI version %d" % list_elem.version)
- return False
-
- if list_elem.get_fullState() != self.full_state:
- self.fail_test("Unexpected fullState value %s" %
- str(list_elem.fullState))
- return False
-
- name = list_elem.get_name()
- if len(name) != 1:
- self.fail_test("Unexpected number of names (%d) in RLMI list" %
- len(list_elem.name))
- return False
-
- if len(list_elem.resource) != len(resources):
- self.fail_test("Unexpected number of resources (%d) in RLMI list" %
- len(list_elem.resource))
- return False
-
- if name[0].get_valueOf_() != list_name:
- self.fail_test("Unexpected list name: %s" %
- list_elem.name[0].value())
- return False
-
- for resource in list_elem.resource:
- if not self.validate_rlmi_resource(resource, resources):
- return False
- return True
-
- def validate_rlmi_resource(self, rlmi_resource, resources):
- """Validate an RLMI resource
-
- This method checks the integrity of a resource XML element within an
- RLMI list.
-
- Arguments:
- rlmi_resource The XML <resource> element in the RLMI <list>, as parsed
- by lxml
- resources The expected resources dictionary relevant to this RLMI
- resource
- """
- if not rlmi_resource.uri:
- self.fail_test("Resource is missing a URI")
- return False
-
- name = rlmi_resource.get_name()
- if len(name) != 1:
- self.fail_test("Unexpected number of names (%d) in resource" %
- len(rlmi_resource.name))
- return False
-
- if len(rlmi_resource.instance) != 1:
- self.fail_test("Unexpeced number of instances (%d) in resource" %
- len(rlmi_resource.instance))
- return False
-
- name_sought = name[0].valueOf_
- if name_sought not in resources:
- self.fail_test("Unexpected resource name %s" % name)
- return False
-
- instance = rlmi_resource.instance[0]
- if not instance.state:
- self.fail_test("Resource instance has no state")
- return False
- if not instance.id:
- self.fail_test("Resource instance has no id")
- return False
- if not instance.cid:
- self.fail_test("Resource instance has no cid")
- return False
-
- if instance.state != resources[name_sought]['state']:
- self.fail_test("Unexpected instance state %s" % instance.state)
- return False
-
- self.rlmi_cids[rlmi_resource.uri] = rlmi_resource.instance[0].cid
- return True
-
- def validate_pidf(self, pidf_part, resources):
- """Validates the integrity of a PIDF body
-
- This uses XML ElementTree to parse the PIDF body and ensures basic
- structural elements (as they relate to RLS) are present.
-
- Arguments:
- pidf_part The PIDF part from a multipart body.
- resources The expected resources dictionary relevant to this PIDF body
-
- Returns:
- True if the pidf_part matched the expectations in resources
- False if the pidf_part did not match the expectations in resources
- """
-
- if not pidf_part.content_id:
- self.fail_test("PIDF part does not have a Content-ID")
- return False
-
- try:
- root = ET.fromstring(pidf_part.xml)
- except Exception as ex:
- self.fail_test("Exception when parsing PIDF XML: %s" % ex)
- return False
-
- entity = root.get('entity')
- if not entity:
- self.fail_test("PIDF document root has no entity")
- return False
-
- stripped_entity = entity.strip('<>')
- self.resource_cids[stripped_entity] = pidf_part.content_id
- return True
-
- def validate_mwi(self, mwi_part, resources, rlmi):
- """Validates the integrity of an MWI body
-
- this uses the rlmi that included the mwi body so that its
- name and URI can be determined and linked to the appropriate
- resource.
-
- Arguments:
- mwi_part The MWI part from a multipart body
- resources The expected resources dictionary relevant to this MWI body
- rlmi The rlmi that described the WMI body
-
- Returns:
- True if the mwi_part matched the expectations in resources
- False if the mwi_part did not match the expectations in resources
- """
-
- if not mwi_part.content_id:
- self.fail_test("MWI part does not have a Content-ID")
- return False
-
- if not rlmi:
- self.fail_test("MWI part has now RLMI body")
- return False
-
- my_name = None
- my_uri = None
-
- for resource in rlmi.body.list_elem.resource:
- if resource.instance[0].cid == mwi_part.content_id:
- my_name = resource.name[0].valueOf_
- my_uri = resource.uri
- break
-
- if not my_name:
- self.fail_test("Couldn't find MWI part with Content ID '%s' in "
- "MWI body" % mwi_part.content_id)
- return False
-
- if not my_uri:
- self.fail_test("URI not found for resource '%s'" % my_name)
- return False
-
- relevant_resource = resources.get(my_name)
- if not relevant_resource:
- self.fail_test("MWI '%s' not specified in expected resources" %
- my_name)
- return False
-
- if relevant_resource['type'] != 'MWI':
- self.fail_test("Resource expected type isn't an MWI type.")
- return False
-
- if relevant_resource['voice_message'] != mwi_part.voice_message:
- self.fail_test("Voice-Message header doesn't match expectations")
- return False
-
- if relevant_resource['messages_waiting'] != mwi_part.messages_waiting:
- self.fail_test("Messages-Waiting header doesn't match " +
- "expectations")
- return False
-
- self.resource_cids[my_uri] = mwi_part.content_id
- return True
-
- def validate_multipart(self, multi_part, resources):
- """Validates the integrity of a Multipart body
-
- This filters down through parts within the multipart body
- in order to recursively evaluate each element contained
- within.
-
- Arguments:
- multi_part The Multipart part from a multipart body.
- resources The expected resources dictionary relevant for this
- multipart body. May be the full list specified by the test
- or a deeper node.
-
- Returns:
- True if the multi_part body matches the expectations in resources
- False if the multi_part body does not match the expectations
- """
- rlmi = None
-
- if not multi_part.content_id:
- self.fail_test("Multipart does not have a Content-ID")
- return False
-
- for part in multi_part.body.parts:
- if part.body.packet_type == 'RLMI':
- rlmi = part
- name = part.body.list_elem.name[0].valueOf_
- uri = part.body.list_elem.uri
-
- self.resource_cids[uri] = multi_part.content_id
-
- next_resources = resources.get(name)
- if not next_resources:
- self.fail_test("Missing '%s'" % name)
- return False
-
- if next_resources['type'] != 'Multipart':
- self.fail_test("Packet Type is wrong -- processing multipart, "
- "but expected type is %s" % next_resources['type'])
- return False
-
- next_resources = next_resources['sublist']
-
- for part in multi_part.body.parts:
- if not self.validate_body_part(part, next_resources, rlmi, name):
- return False
- return True
-
- def fail_test(self, message):
- """Mark the test as failed and stop the reactor
-
- Arguments:
- message Reason for the test failure"""
- LOGGER.error(message)
- self.test_object.set_passed(False)
- self.test_object.stop_reactor()
-
diff --git a/tests/channels/pjsip/subscriptions/rls/rls_test.py b/tests/channels/pjsip/subscriptions/rls/rls_test.py
index f0b64e2..9d41fbe 100755
--- a/tests/channels/pjsip/subscriptions/rls/rls_test.py
+++ b/tests/channels/pjsip/subscriptions/rls/rls_test.py
@@ -10,14 +10,78 @@
import sys
import logging
-sys.path.append('lib/python')
-sys.path.append('tests/channels/pjsip/subscriptions/rls')
+sys.path.append("lib/python")
+sys.path.append("tests/channels/pjsip/subscriptions/rls")
from pcap import VOIPListener
-from rls_integrity import RLSValidator
+from rls_element import RLSPacket
+from rls_validation import ValidationInfo
from twisted.internet import reactor
LOGGER = logging.getLogger(__name__)
+
+def filter_multipart_packet(packet):
+ """Determines if a packet is an RLS multipart packet.
+
+ Keyword Arguments:
+ packet -- A yappcap.PcapPacket
+
+ Returns:
+ True if this is a multipart NOTIFY packet. Otherwise, returns False.
+ """
+
+ # If the packet is not a SIP NOTIFY, this is not a packet we care
+ # about.
+ if "NOTIFY" not in packet.request_line:
+ LOGGER.debug("Ignoring packet, is not a NOTIFY")
+ return False
+
+ # If the packet body is not a multipart packet body, this is not a
+ # packet we care about.
+ if packet.body.packet_type != "Multipart":
+ LOGGER.debug("Ignoring packet, NOTIFY does not contain " \
+ "multipart body")
+ return False
+
+ return True
+
+def log_packet(packet, write_packet_contents):
+ """Writes the contents of a SIP packet to the log.
+
+ Keyword Arguments:
+ packet -- A yappcap.PcapPacket
+ write_packet_contents -- Whether or not to dump the contents of the
+ packet to the log.
+ """
+
+ message = "Received SIP packet:\n"
+
+ if not write_packet_contents:
+ return
+
+ for attr in dir(packet):
+ message += "%s = %s\n" % (attr, getattr(packet, attr))
+ if attr == "body":
+ message += "body:\n"
+ body = getattr(packet, attr)
+ for body_attr in dir(body):
+ value = getattr(body, body_attr)
+ message += "\t%s = %s\n" % (body_attr, value)
+ message += "%s = %s\n" % (attr, getattr(packet, attr))
+
+ LOGGER.debug(message)
+
+def set_pcap_defaults(module_config):
+ """Sets default values for the PcapListener."""
+
+ if not module_config.get("bpf-filter"):
+ module_config["bpf-filter"] = "udp port 5061"
+ if not module_config.get("register-observer"):
+ module_config["register-observer"] = True
+ if not module_config.get("debug-packets"):
+ module_config["debug-packets"] = True
+ if not module_config.get("device"):
+ module_config["device"] = "lo"
class RLSTest(VOIPListener):
@@ -37,41 +101,72 @@
test_object -- Used to manipulate reactor and set/remove
failure tokens.
"""
- self.set_pcap_defaults(module_config)
- VOIPListener.__init__(self, module_config, test_object)
+
+ set_pcap_defaults(module_config)
+ super(RLSTest, self).__init__(module_config, test_object)
self.test_object = test_object
- self.token = test_object.create_fail_token("Haven't handled all "
- "expected NOTIFY packets.")
+ self.ami_action = module_config.get("ami-action")
-
- self.list_name = module_config['list-name']
- self.log_packets = module_config.get("log-packets", False)
- self.packets = module_config['packets']
- self.ami_action = module_config.get('ami-action')
- self.stop_test_after_notifys = \
- module_config.get("stop-test-after-notifys", True)
-
+ if self.check_prerequisites():
+ self.test_object.register_scenario_started_observer(
+ self.on_scenario_started)
self.ami = None
self.packets_idx = 0
- self.test_object.register_ami_observer(self.ami_connect)
- if hasattr(self.test_object, 'register_scenario_started_observer'):
- self.test_object.register_scenario_started_observer(
- self.scenario_started)
- self.add_callback('SIP', self.multipart_handler)
+ self.list_name = module_config["list-name"]
+ self.log_packets = module_config.get("log-packets", False)
+ self.packets = module_config["packets"]
- def set_pcap_defaults(self, module_config):
- """Sets default values for the PcapListener
+ self.stop_test_after_notifys = \
+ module_config.get("stop-test-after-notifys", True)
+
+ token_msg = "Test execution was interrupted before all NOTIFY " \
+ "packets were accounted for."
+ self.token = self.test_object.create_fail_token(token_msg)
+
+ self.test_object.register_ami_observer(self.on_ami_connect)
+ self.add_callback("SIP", self.multipart_handler)
+
+ def check_prerequisites(self):
+ """Checks the test_object can support an ami_action, if configured.
+
+ Note: This method will fail the test if it is determined that the
+ test has a dependency on an ami_action but uses a test object that
+ does not have a definition for 'register_scenario_started_observer'.
+
+ Returns:
+ True if the test object supports 'register_scenario_started_observer'.
+ Otherwise, returns False.
"""
- if not module_config.get('bpf-filter'):
- module_config['bpf-filter'] = 'udp port 5061'
- if not module_config.get('register-observer'):
- module_config['register-observer'] = True
- if not module_config.get('debug-packets'):
- module_config['debug-packets'] = True
- if not module_config.get('device'):
- module_config['device'] = 'lo'
+
+ is_start_observer = hasattr(self.test_object,
+ "register_scenario_started_observer")
+ if self.ami_action is not None:
+ message = "This test is configured with an ami_action " \
+ "attribute. However, it is also configured to " \
+ "use a test-object that does not contain support " \
+ "for 'register_scenario_started_observer'. As a " \
+ "result, the ami_action will never be executed. " \
+ "Either reconfigure the test to run without a " \
+ "dependency for an ami_action or change the " \
+ "test-object type to a type that supports " \
+ "'register_scenario_started_observer'."
+ self.fail_test(message)
+
+ return is_start_observer
+
+ def fail_test(self, message):
+ """Marks the test as failed and stops the reactor.
+
+ Keyword Arguments:
+ message -- Reason for the test failure"""
+
+ LOGGER.error(message)
+ self.test_object.remove_fail_token(self.token)
+ self.token = self.test_object.create_fail_token(message)
+ self.test_object.set_passed(False)
+ self.test_object.stop_reactor()
def multipart_handler(self, packet):
"""Handles incoming SIP packets and verifies contents
@@ -85,39 +180,36 @@
packet -- Incoming SIP Packet
"""
- LOGGER.debug('Received SIP packet')
+ log_packet(packet, self.log_packets)
- if 'NOTIFY' not in packet.request_line:
- LOGGER.debug('Ignoring packet, is not a NOTIFY')
- return
-
- if packet.body.packet_type != 'Multipart':
- LOGGER.debug('Ignoring packet, NOTIFY does not contain ' +
- 'multipart body')
+ if not filter_multipart_packet(packet):
return
if self.packets_idx >= len(self.packets):
- LOGGER.debug('Ignoring packet, version is higher than count of ' +
- 'test expectations')
- return
+ message = "Received more packets ({0}) than expected ({1}). " \
+ "Failing test.".format(self.packets_idx,
+ len(self.packets))
+ self.fail_test(message)
+ rls_packet = RLSPacket(packet)
resources = self.packets[self.packets_idx]["resources"]
- full_state = self.packets[self.packets_idx]["full-state"]
+ fullstate = self.packets[self.packets_idx]["full-state"]
- validator = RLSValidator(test_object=self.test_object,
- packet=packet,
- version=self.packets_idx,
- full_state=full_state,
- list_name=self.list_name,
- resources=resources)
+ info = ValidationInfo(resources=resources,
+ version=self.packets_idx,
+ fullstate=fullstate,
+ rlmi=None,
+ rlmi_name=self.list_name)
- debug_msg = "validating packet -- expecting {0}"
- LOGGER.debug(debug_msg.format(self.packets[self.packets_idx]))
- if not validator.check_integrity():
- LOGGER.error('Integrity Check Failed.')
+ message = "Validating Resource ({0}) of ({1})..."
+ LOGGER.debug(message.format(self.packets_idx, len(self.packets) - 1))
+
+ if not rls_packet.validate(info):
+ message = "Integrity Check Failed for Resource ({0})."
+ self.fail_test(message.format(self.packets_idx))
return
- info_msg = "Packet validated successfully. Test Phase {0} Completed."
+ info_msg = "Resource ({0}) validated successfully."
LOGGER.info(info_msg.format(self.packets_idx))
self.packets_idx += 1
@@ -125,17 +217,17 @@
info_msg = "All test phases completed. RLS verification complete."
LOGGER.info(info_msg)
self.test_object.remove_fail_token(self.token)
- if self.stop_after_notifys:
+ if self.stop_test_after_notifys:
# We only deal with as many NOTIFIES as we have defined in our
# test-config.yaml
self.test_object.set_passed(True)
self.test_object.stop_reactor()
- def ami_connect(self, ami):
+ def on_ami_connect(self, ami):
"""Callback when AMI connects. Sets test AMI instance."""
self.ami = ami
- def scenario_started(self, scenario):
+ def on_scenario_started(self, scenario):
"""Callback when SIPp scenario has started.
If this test executes AMI actions, this function will execute
diff --git a/tests/channels/pjsip/subscriptions/rls/rls_validation.py b/tests/channels/pjsip/subscriptions/rls/rls_validation.py
new file mode 100755
index 0000000..7f30264
--- /dev/null
+++ b/tests/channels/pjsip/subscriptions/rls/rls_validation.py
@@ -0,0 +1,37 @@
+#/usr/bin/env python
+"""
+Copyright (C) 2015, Digium, Inc.
+Ashley Sanders <asanders at digium.com>
+
+This program is free software, distributed under the terms of
+the GNU General Public License Version 2.
+"""
+
+import sys
+
+sys.path.append('lib/python')
+sys.path.append('tests/channels/pjsip/subscriptions/rls')
+
+
+class ValidationInfo(object):
+ """Lightweight helper class for storing the validation business logic."""
+
+ def __init__(self, resources, version, fullstate, rlmi, rlmi_name):
+ """Constructor.
+
+ Keyword Arguments:
+ resources -- A dictionary of the resource names and their
+ expected state.
+ version -- The expected RLMI version attribute.
+ Expressed as an integer.
+ fullstate -- The expected RLMI fullState attribute.
+ Expressed as a boolean.
+ rlmi -- The RLMI part of the packet.
+ rlmi_name -- The expected RLMI name element value.
+ """
+
+ self.resources = resources
+ self.version = version
+ self.fullstate = fullstate
+ self.rlmi = rlmi
+ self.rlmi_name = rlmi_name
--
To view, visit https://gerrit.asterisk.org/1537
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Icb8af439f5c3db3efb0a93d6402ab4dcacdf3731
Gerrit-PatchSet: 4
Gerrit-Project: testsuite
Gerrit-Branch: master
Gerrit-Owner: Ashley Sanders <asanders at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: Ashley Sanders <asanders at digium.com>
Gerrit-Reviewer: Jonathan Rose <jrose at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
More information about the asterisk-commits
mailing list