[Asterisk-code-review] Fix pep8 and flake8 issues. (starpy[master])
Corey Farrell
asteriskteam at digium.com
Mon May 14 08:40:45 CDT 2018
Corey Farrell has uploaded this change for review. ( https://gerrit.asterisk.org/8990
Change subject: Fix pep8 and flake8 issues.
......................................................................
Fix pep8 and flake8 issues.
* Update examples to be compatible with python3.
* Address pep8 and flake8 issues doc/, starpy/ and examples/.
* Fix undefined name keytree in fastagi:FastAGIProtocol.databaseDeltree.
* Set maximum line length to 90.
Change-Id: I293635427a48e66298042c3fc1e600e47854d3ed
---
M doc/pydoc/builddocs.py
M doc/pydoc/pydoc2.py
M examples/amicommand.py
M examples/autosurvey/frontend.py
M examples/calldurationcallback.py
M examples/connecttoivr.py
M examples/connecttoivrapp.py
M examples/fastagisetvariable.py
M examples/getvariable.py
M examples/hellofastagi.py
M examples/hellofastagiapp.py
M examples/menu.py
M examples/menutest.py
M examples/priexhaustion.py
M examples/priexhaustionbare.py
M examples/readingdigits.py
M examples/timestamp.py
M examples/timestampapp.py
M examples/utilapplication.py
M starpy/fastagi.py
M starpy/manager.py
M tox.ini
22 files changed, 1,496 insertions(+), 1,343 deletions(-)
git pull ssh://gerrit.asterisk.org:29418/starpy refs/changes/90/8990/1
diff --git a/doc/pydoc/builddocs.py b/doc/pydoc/builddocs.py
index 080f7c6..9372eb1 100755
--- a/doc/pydoc/builddocs.py
+++ b/doc/pydoc/builddocs.py
@@ -2,26 +2,25 @@
import pydoc2
if __name__ == "__main__":
- excludes = [
- "Numeric",
- "_tkinter",
- "Tkinter",
- "math",
- "string",
- "twisted",
- ]
- stops = [
- ]
+ excludes = [
+ "Numeric",
+ "_tkinter",
+ "Tkinter",
+ "math",
+ "string",
+ "twisted",
+ ]
+ stops = [
+ ]
- modules = [
- 'starpy',
- 'starpy.examples',
- '__builtin__',
- ]
- pydoc2.PackageDocumentationGenerator(
- baseModules = modules,
- destinationDirectory = ".",
- exclusions = excludes,
- recursionStops = stops,
- ).process ()
-
+ modules = [
+ 'starpy',
+ 'starpy.examples',
+ '__builtin__',
+ ]
+ pydoc2.PackageDocumentationGenerator(
+ baseModules=modules,
+ destinationDirectory=".",
+ exclusions=excludes,
+ recursionStops=stops,
+ ).process()
diff --git a/doc/pydoc/pydoc2.py b/doc/pydoc/pydoc2.py
index 8fc4d33..cc85c6a 100644
--- a/doc/pydoc/pydoc2.py
+++ b/doc/pydoc/pydoc2.py
@@ -1,463 +1,462 @@
"""Pydoc sub-class for generating documentation for entire packages"""
-import pydoc, inspect, os, string
-import sys, imp, os, stat, re, types, inspect
-from repr import Repr
-from string import expandtabs, find, join, lower, split, strip, rfind, rstrip
+import pydoc
+import inspect
+import os
+import string
+import sys
+from string import join, split, strip
+
def classify_class_attrs(cls):
- """Return list of attribute-descriptor tuples.
+ """Return list of attribute-descriptor tuples.
- For each name in dir(cls), the return list contains a 4-tuple
- with these elements:
+ For each name in dir(cls), the return list contains a 4-tuple
+ with these elements:
- 0. The name (a string).
+ 0. The name (a string).
- 1. The kind of attribute this is, one of these strings:
- 'class method' created via classmethod()
- 'static method' created via staticmethod()
- 'property' created via property()
- 'method' any other flavor of method
- 'data' not a method
+ 1. The kind of attribute this is, one of these strings:
+ 'class method' created via classmethod()
+ 'static method' created via staticmethod()
+ 'property' created via property()
+ 'method' any other flavor of method
+ 'data' not a method
- 2. The class which defined this attribute (a class).
+ 2. The class which defined this attribute (a class).
- 3. The object as obtained directly from the defining class's
- __dict__, not via getattr. This is especially important for
- data attributes: C.data is just a data object, but
- C.__dict__['data'] may be a data descriptor with additional
- info, like a __doc__ string.
-
- Note: This version is patched to work with Zope Interface-bearing objects
- """
+ 3. The object as obtained directly from the defining class's
+ __dict__, not via getattr. This is especially important for
+ data attributes: C.data is just a data object, but
+ C.__dict__['data'] may be a data descriptor with additional
+ info, like a __doc__ string.
- mro = inspect.getmro(cls)
- names = dir(cls)
- result = []
- for name in names:
- # Get the object associated with the name.
- # Getting an obj from the __dict__ sometimes reveals more than
- # using getattr. Static and class methods are dramatic examples.
- if name in cls.__dict__:
- obj = cls.__dict__[name]
- else:
- try:
- obj = getattr(cls, name)
- except AttributeError, err:
- continue
+ Note: This version is patched to work with Zope Interface-bearing objects
+ """
- # Figure out where it was defined.
- homecls = getattr(obj, "__objclass__", None)
- if homecls is None:
- # search the dicts.
- for base in mro:
- if name in base.__dict__:
- homecls = base
- break
+ mro = inspect.getmro(cls)
+ names = dir(cls)
+ result = []
+ for name in names:
+ # Get the object associated with the name.
+ # Getting an obj from the __dict__ sometimes reveals more than
+ # using getattr. Static and class methods are dramatic examples.
+ if name in cls.__dict__:
+ obj = cls.__dict__[name]
+ else:
+ try:
+ obj = getattr(cls, name)
+ except AttributeError as err:
+ continue
- # Get the object again, in order to get it from the defining
- # __dict__ instead of via getattr (if possible).
- if homecls is not None and name in homecls.__dict__:
- obj = homecls.__dict__[name]
+ # Figure out where it was defined.
+ homecls = getattr(obj, "__objclass__", None)
+ if homecls is None:
+ # search the dicts.
+ for base in mro:
+ if name in base.__dict__:
+ homecls = base
+ break
- # Also get the object via getattr.
- obj_via_getattr = getattr(cls, name)
+ # Get the object again, in order to get it from the defining
+ # __dict__ instead of via getattr (if possible).
+ if homecls is not None and name in homecls.__dict__:
+ obj = homecls.__dict__[name]
- # Classify the object.
- if isinstance(obj, staticmethod):
- kind = "static method"
- elif isinstance(obj, classmethod):
- kind = "class method"
- elif isinstance(obj, property):
- kind = "property"
- elif (inspect.ismethod(obj_via_getattr) or
- inspect.ismethoddescriptor(obj_via_getattr)):
- kind = "method"
- else:
- kind = "data"
+ # Also get the object via getattr.
+ obj_via_getattr = getattr(cls, name)
- result.append((name, kind, homecls, obj))
+ # Classify the object.
+ if isinstance(obj, staticmethod):
+ kind = "static method"
+ elif isinstance(obj, classmethod):
+ kind = "class method"
+ elif isinstance(obj, property):
+ kind = "property"
+ elif (inspect.ismethod(obj_via_getattr) or
+ inspect.ismethoddescriptor(obj_via_getattr)):
+ kind = "method"
+ else:
+ kind = "data"
- return result
+ result.append((name, kind, homecls, obj))
+
+ return result
inspect.classify_class_attrs = classify_class_attrs
class DefaultFormatter(pydoc.HTMLDoc):
- def docmodule(self, object, name=None, mod=None, packageContext = None, *ignored):
- """Produce HTML documentation for a module object."""
- name = object.__name__ # ignore the passed-in name
- parts = split(name, '.')
- links = []
- for i in range(len(parts)-1):
- links.append(
- '<a href="%s.html"><font color="#ffffff">%s</font></a>' %
- (join(parts[:i+1], '.'), parts[i]))
- linkedname = join(links + parts[-1:], '.')
- head = '<big><big><strong>%s</strong></big></big>' % linkedname
- try:
- path = inspect.getabsfile(object)
- url = path
- if sys.platform == 'win32':
- import nturl2path
- url = nturl2path.pathname2url(path)
- filelink = '<a href="file:%s">%s</a>' % (url, path)
- except TypeError:
- filelink = '(built-in)'
- info = []
- if hasattr(object, '__version__'):
- version = str(object.__version__)
- if version[:11] == '$' + 'Revision: ' and version[-1:] == '$':
- version = strip(version[11:-1])
- info.append('version %s' % self.escape(version))
- if hasattr(object, '__date__'):
- info.append(self.escape(str(object.__date__)))
- if info:
- head = head + ' (%s)' % join(info, ', ')
- result = self.heading(
- head, '#ffffff', '#7799ee', '<a href=".">index</a><br>' + filelink)
+ def docmodule(self, object, name=None, mod=None, packageContext=None, *ignored):
+ """Produce HTML documentation for a module object."""
+ name = object.__name__ # ignore the passed-in name
+ parts = split(name, '.')
+ links = []
+ for i in range(len(parts)-1):
+ links.append(
+ '<a href="%s.html"><font color="#ffffff">%s</font></a>' %
+ (join(parts[:i+1], '.'), parts[i]))
+ linkedname = join(links + parts[-1:], '.')
+ head = '<big><big><strong>%s</strong></big></big>' % linkedname
+ try:
+ path = inspect.getabsfile(object)
+ url = path
+ if sys.platform == 'win32':
+ import nturl2path
+ url = nturl2path.pathname2url(path)
+ filelink = '<a href="file:%s">%s</a>' % (url, path)
+ except TypeError:
+ filelink = '(built-in)'
+ info = []
+ if hasattr(object, '__version__'):
+ version = str(object.__version__)
+ if version[:11] == '$' + 'Revision: ' and version[-1:] == '$':
+ version = strip(version[11:-1])
+ info.append('version %s' % self.escape(version))
+ if hasattr(object, '__date__'):
+ info.append(self.escape(str(object.__date__)))
+ if info:
+ head = head + ' (%s)' % join(info, ', ')
+ result = self.heading(
+ head, '#ffffff', '#7799ee', '<a href=".">index</a><br>' + filelink)
- modules = inspect.getmembers(object, inspect.ismodule)
+ modules = inspect.getmembers(object, inspect.ismodule)
- classes, cdict = [], {}
- for key, value in inspect.getmembers(object, inspect.isclass):
- if (inspect.getmodule(value) or object) is object:
- classes.append((key, value))
- cdict[key] = cdict[value] = '#' + key
- for key, value in classes:
- for base in value.__bases__:
- key, modname = base.__name__, base.__module__
- module = sys.modules.get(modname)
- if modname != name and module and hasattr(module, key):
- if getattr(module, key) is base:
- if not cdict.has_key(key):
- cdict[key] = cdict[base] = modname + '.html#' + key
- funcs, fdict = [], {}
- for key, value in inspect.getmembers(object, inspect.isroutine):
- if inspect.isbuiltin(value) or inspect.getmodule(value) is object:
- funcs.append((key, value))
- fdict[key] = '#-' + key
- if inspect.isfunction(value): fdict[value] = fdict[key]
- data = []
- for key, value in inspect.getmembers(object, pydoc.isdata):
- if key not in ['__builtins__', '__doc__']:
- data.append((key, value))
+ classes, cdict = [], {}
+ for key, value in inspect.getmembers(object, inspect.isclass):
+ if (inspect.getmodule(value) or object) is object:
+ classes.append((key, value))
+ cdict[key] = cdict[value] = '#' + key
+ for key, value in classes:
+ for base in value.__bases__:
+ key, modname = base.__name__, base.__module__
+ module = sys.modules.get(modname)
+ if modname != name and module and hasattr(module, key):
+ if getattr(module, key) is base:
+ if key not in cdict:
+ cdict[key] = cdict[base] = modname + '.html#' + key
+ funcs, fdict = [], {}
+ for key, value in inspect.getmembers(object, inspect.isroutine):
+ if inspect.isbuiltin(value) or inspect.getmodule(value) is object:
+ funcs.append((key, value))
+ fdict[key] = '#-' + key
+ if inspect.isfunction(value):
+ fdict[value] = fdict[key]
+ data = []
+ for key, value in inspect.getmembers(object, pydoc.isdata):
+ if key not in ['__builtins__', '__doc__']:
+ data.append((key, value))
- doc = self.markup(pydoc.getdoc(object), self.preformat, fdict, cdict)
- doc = doc and '<tt>%s</tt>' % doc
- result = result + '<p>%s</p>\n' % doc
+ doc = self.markup(pydoc.getdoc(object), self.preformat, fdict, cdict)
+ doc = doc and '<tt>%s</tt>' % doc
+ result = result + '<p>%s</p>\n' % doc
- packageContext.clean ( classes, object )
- packageContext.clean ( funcs, object )
- packageContext.clean ( data, object )
-
- if hasattr(object, '__path__'):
- modpkgs = []
- modnames = []
- for file in os.listdir(object.__path__[0]):
- path = os.path.join(object.__path__[0], file)
- modname = inspect.getmodulename(file)
- if modname and modname not in modnames:
- modpkgs.append((modname, name, 0, 0))
- modnames.append(modname)
- elif pydoc.ispackage(path):
- modpkgs.append((file, name, 1, 0))
- modpkgs.sort()
- contents = self.multicolumn(modpkgs, self.modpkglink)
-## result = result + self.bigsection(
-## 'Package Contents', '#ffffff', '#aa55cc', contents)
- result = result + self.moduleSection( object, packageContext)
- elif modules:
- contents = self.multicolumn(
- modules, lambda (key, value), s=self: s.modulelink(value))
- result = result + self.bigsection(
- 'Modules', '#fffff', '#aa55cc', contents)
+ packageContext.clean(classes, object)
+ packageContext.clean(funcs, object)
+ packageContext.clean(data, object)
-
- if classes:
-## print classes
-## import pdb
-## pdb.set_trace()
- classlist = map(lambda (key, value): value, classes)
- contents = [
- self.formattree(inspect.getclasstree(classlist, 1), name)]
- for key, value in classes:
- contents.append(self.document(value, key, name, fdict, cdict))
- result = result + self.bigsection(
- 'Classes', '#ffffff', '#ee77aa', join(contents))
- if funcs:
- contents = []
- for key, value in funcs:
- contents.append(self.document(value, key, name, fdict, cdict))
- result = result + self.bigsection(
- 'Functions', '#ffffff', '#eeaa77', join(contents))
- if data:
- contents = []
- for key, value in data:
- try:
- contents.append(self.document(value, key))
- except Exception, err:
- pass
- result = result + self.bigsection(
- 'Data', '#ffffff', '#55aa55', join(contents, '<br>\n'))
- if hasattr(object, '__author__'):
- contents = self.markup(str(object.__author__), self.preformat)
- result = result + self.bigsection(
- 'Author', '#ffffff', '#7799ee', contents)
- if hasattr(object, '__credits__'):
- contents = self.markup(str(object.__credits__), self.preformat)
- result = result + self.bigsection(
- 'Credits', '#ffffff', '#7799ee', contents)
+ if hasattr(object, '__path__'):
+ modpkgs = []
+ modnames = []
+ for file in os.listdir(object.__path__[0]):
+ path = os.path.join(object.__path__[0], file)
+ modname = inspect.getmodulename(file)
+ if modname and modname not in modnames:
+ modpkgs.append((modname, name, 0, 0))
+ modnames.append(modname)
+ elif pydoc.ispackage(path):
+ modpkgs.append((file, name, 1, 0))
+ modpkgs.sort()
+ contents = self.multicolumn(modpkgs, self.modpkglink)
+ result = result + self.moduleSection(object, packageContext)
+ elif modules:
+ contents = self.multicolumn(
+ modules, lambda kvp, s=self: s.modulelink(kvp[1]))
+ result = result + self.bigsection(
+ 'Modules', '#fffff', '#aa55cc', contents)
- return result
+ if classes:
+ classlist = map(lambda kvp: kvp[1], classes)
+ contents = [
+ self.formattree(inspect.getclasstree(classlist, 1), name)]
+ for key, value in classes:
+ contents.append(self.document(value, key, name, fdict, cdict))
+ result = result + self.bigsection(
+ 'Classes', '#ffffff', '#ee77aa', join(contents))
+ if funcs:
+ contents = []
+ for key, value in funcs:
+ contents.append(self.document(value, key, name, fdict, cdict))
+ result = result + self.bigsection(
+ 'Functions', '#ffffff', '#eeaa77', join(contents))
+ if data:
+ contents = []
+ for key, value in data:
+ try:
+ contents.append(self.document(value, key))
+ except Exception as err:
+ pass
+ result = result + self.bigsection(
+ 'Data', '#ffffff', '#55aa55', join(contents, '<br>\n'))
+ if hasattr(object, '__author__'):
+ contents = self.markup(str(object.__author__), self.preformat)
+ result = result + self.bigsection(
+ 'Author', '#ffffff', '#7799ee', contents)
+ if hasattr(object, '__credits__'):
+ contents = self.markup(str(object.__credits__), self.preformat)
+ result = result + self.bigsection(
+ 'Credits', '#ffffff', '#7799ee', contents)
- def classlink(self, object, modname):
- """Make a link for a class."""
- name, module = object.__name__, sys.modules.get(object.__module__)
- if hasattr(module, name) and getattr(module, name) is object:
- return '<a href="%s.html#%s">%s</a>' % (
- module.__name__, name, name
- )
- return pydoc.classname(object, modname)
-
- def moduleSection( self, object, packageContext ):
- """Create a module-links section for the given object (module)"""
- modules = inspect.getmembers(object, inspect.ismodule)
- packageContext.clean ( modules, object )
- packageContext.recurseScan( modules )
+ return result
- if hasattr(object, '__path__'):
- modpkgs = []
- modnames = []
- for file in os.listdir(object.__path__[0]):
- path = os.path.join(object.__path__[0], file)
- modname = inspect.getmodulename(file)
- if modname and modname not in modnames:
- modpkgs.append((modname, object.__name__, 0, 0))
- modnames.append(modname)
- elif pydoc.ispackage(path):
- modpkgs.append((file, object.__name__, 1, 0))
- modpkgs.sort()
- # do more recursion here...
- for (modname, name, ya,yo) in modpkgs:
- packageContext.addInteresting( join( (object.__name__, modname), '.'))
- items = []
- for (modname, name, ispackage,isshadowed) in modpkgs:
- try:
- # get the actual module object...
-## if modname == "events":
-## import pdb
-## pdb.set_trace()
- module = pydoc.safeimport( "%s.%s"%(name,modname) )
- description, documentation = pydoc.splitdoc( inspect.getdoc( module ))
- if description:
- items.append(
- """%s -- %s"""% (
- self.modpkglink( (modname, name, ispackage, isshadowed) ),
- description,
- )
- )
- else:
- items.append(
- self.modpkglink( (modname, name, ispackage, isshadowed) )
- )
- except:
- items.append(
- self.modpkglink( (modname, name, ispackage, isshadowed) )
- )
- contents = string.join( items, '<br>')
- result = self.bigsection(
- 'Package Contents', '#ffffff', '#aa55cc', contents)
- elif modules:
- contents = self.multicolumn(
- modules, lambda (key, value), s=self: s.modulelink(value))
- result = self.bigsection(
- 'Modules', '#fffff', '#aa55cc', contents)
- else:
- result = ""
- return result
-
-
+ def classlink(self, object, modname):
+ """Make a link for a class."""
+ name, module = object.__name__, sys.modules.get(object.__module__)
+ if hasattr(module, name) and getattr(module, name) is object:
+ return '<a href="%s.html#%s">%s</a>' % (
+ module.__name__, name, name
+ )
+ return pydoc.classname(object, modname)
+
+ def moduleSection(self, object, packageContext):
+ """Create a module-links section for the given object (module)"""
+ modules = inspect.getmembers(object, inspect.ismodule)
+ packageContext.clean(modules, object)
+ packageContext.recurseScan(modules)
+
+ if hasattr(object, '__path__'):
+ modpkgs = []
+ modnames = []
+ for file in os.listdir(object.__path__[0]):
+ path = os.path.join(object.__path__[0], file)
+ modname = inspect.getmodulename(file)
+ if modname and modname not in modnames:
+ modpkgs.append((modname, object.__name__, 0, 0))
+ modnames.append(modname)
+ elif pydoc.ispackage(path):
+ modpkgs.append((file, object.__name__, 1, 0))
+ modpkgs.sort()
+ # do more recursion here...
+ for (modname, name, ya, yo) in modpkgs:
+ packageContext.addInteresting(join((object.__name__, modname), '.'))
+ items = []
+ for (modname, name, ispackage, isshadowed) in modpkgs:
+ try:
+ # get the actual module object...
+ module = pydoc.safeimport("%s.%s" % (name, modname))
+ description, documentation = pydoc.splitdoc(inspect.getdoc(module))
+ if description:
+ items.append(
+ """%s -- %s""" % (
+ self.modpkglink((modname, name, ispackage, isshadowed)),
+ description,
+ )
+ )
+ else:
+ items.append(
+ self.modpkglink((modname, name, ispackage, isshadowed))
+ )
+ except:
+ items.append(
+ self.modpkglink((modname, name, ispackage, isshadowed))
+ )
+ contents = string.join(items, '<br>')
+ result = self.bigsection(
+ 'Package Contents', '#ffffff', '#aa55cc', contents)
+ elif modules:
+ contents = self.multicolumn(
+ modules, lambda kvp, s=self: s.modulelink(kvp[1]))
+ result = self.bigsection(
+ 'Modules', '#fffff', '#aa55cc', contents)
+ else:
+ result = ""
+ return result
+
+
class AlreadyDone(Exception):
- pass
-
+ pass
class PackageDocumentationGenerator:
- """A package document generator creates documentation
- for an entire package using pydoc's machinery.
+ """A package document generator creates documentation
+ for an entire package using pydoc's machinery.
- baseModules -- modules which will be included
- and whose included and children modules will be
- considered fair game for documentation
- destinationDirectory -- the directory into which
- the HTML documentation will be written
- recursion -- whether to add modules which are
- referenced by and/or children of base modules
- exclusions -- a list of modules whose contents will
- not be shown in any other module, commonly
- such modules as OpenGL.GL, wxPython.wx etc.
- recursionStops -- a list of modules which will
- explicitly stop recursion (i.e. they will never
- be included), even if they are children of base
- modules.
- formatter -- allows for passing in a custom formatter
- see DefaultFormatter for sample implementation.
- """
- def __init__ (
- self, baseModules, destinationDirectory = ".",
- recursion = 1, exclusions = (),
- recursionStops = (),
- formatter = None
- ):
- self.destinationDirectory = os.path.abspath( destinationDirectory)
- self.exclusions = {}
- self.warnings = []
- self.baseSpecifiers = {}
- self.completed = {}
- self.recursionStops = {}
- self.recursion = recursion
- for stop in recursionStops:
- self.recursionStops[ stop ] = 1
- self.pending = []
- for exclusion in exclusions:
- try:
- self.exclusions[ exclusion ]= pydoc.locate ( exclusion)
- except pydoc.ErrorDuringImport, value:
- self.warn( """Unable to import the module %s which was specified as an exclusion module"""% (repr(exclusion)))
- self.formatter = formatter or DefaultFormatter()
- for base in baseModules:
- self.addBase( base )
- def warn( self, message ):
- """Warnings are used for recoverable, but not necessarily ignorable conditions"""
- self.warnings.append (message)
- def info (self, message):
- """Information/status report"""
- print message
- def addBase(self, specifier):
- """Set the base of the documentation set, only children of these modules will be documented"""
- try:
- self.baseSpecifiers [specifier] = pydoc.locate ( specifier)
- self.pending.append (specifier)
- except pydoc.ErrorDuringImport, value:
- self.warn( """Unable to import the module %s which was specified as a base module"""% (repr(specifier)))
- def addInteresting( self, specifier):
- """Add a module to the list of interesting modules"""
- if self.checkScope( specifier):
-## print "addInteresting", specifier
- self.pending.append (specifier)
- else:
- self.completed[ specifier] = 1
- def checkScope (self, specifier):
- """Check that the specifier is "in scope" for the recursion"""
- if not self.recursion:
- return 0
- items = string.split (specifier, ".")
- stopCheck = items [:]
- while stopCheck:
- name = string.join(items, ".")
- if self.recursionStops.get( name):
- return 0
- elif self.completed.get (name):
- return 0
- del stopCheck[-1]
- while items:
- if self.baseSpecifiers.get( string.join(items, ".")):
- return 1
- del items[-1]
- # was not within any given scope
- return 0
+ baseModules -- modules which will be included
+ and whose included and children modules will be
+ considered fair game for documentation
+ destinationDirectory -- the directory into which
+ the HTML documentation will be written
+ recursion -- whether to add modules which are
+ referenced by and/or children of base modules
+ exclusions -- a list of modules whose contents will
+ not be shown in any other module, commonly
+ such modules as OpenGL.GL, wxPython.wx etc.
+ recursionStops -- a list of modules which will
+ explicitly stop recursion (i.e. they will never
+ be included), even if they are children of base
+ modules.
+ formatter -- allows for passing in a custom formatter
+ see DefaultFormatter for sample implementation.
+ """
+ def __init__(
+ self, baseModules, destinationDirectory=".",
+ recursion=1, exclusions=(),
+ recursionStops=(),
+ formatter=None
+ ):
+ self.destinationDirectory = os.path.abspath(destinationDirectory)
+ self.exclusions = {}
+ self.warnings = []
+ self.baseSpecifiers = {}
+ self.completed = {}
+ self.recursionStops = {}
+ self.recursion = recursion
+ for stop in recursionStops:
+ self.recursionStops[stop] = 1
+ self.pending = []
+ for exclusion in exclusions:
+ try:
+ self.exclusions[exclusion] = pydoc.locate(exclusion)
+ except pydoc.ErrorDuringImport as value:
+ self.warn(
+ ("Unable to import the module %s which "
+ "was specified as an exclusion module") % (repr(exclusion)))
+ self.formatter = formatter or DefaultFormatter()
+ for base in baseModules:
+ self.addBase(base)
- def process( self ):
- """Having added all of the base and/or interesting modules,
- proceed to generate the appropriate documentation for each
- module in the appropriate directory, doing the recursion
- as we go."""
- try:
- while self.pending:
- try:
- if self.completed.has_key( self.pending[0] ):
- raise AlreadyDone( self.pending[0] )
- self.info( """Start %s"""% (repr(self.pending[0])))
- object = pydoc.locate ( self.pending[0] )
- self.info( """ ... found %s"""% (repr(object.__name__)))
- except AlreadyDone:
- pass
- except pydoc.ErrorDuringImport, value:
- self.info( """ ... FAILED %s"""% (repr( value)))
- self.warn( """Unable to import the module %s"""% (repr(self.pending[0])))
- except (SystemError, SystemExit), value:
- self.info( """ ... FAILED %s"""% (repr( value)))
- self.warn( """Unable to import the module %s"""% (repr(self.pending[0])))
- except Exception, value:
- self.info( """ ... FAILED %s"""% (repr( value)))
- self.warn( """Unable to import the module %s"""% (repr(self.pending[0])))
- else:
- page = self.formatter.page(
- pydoc.describe(object),
- self.formatter.docmodule(
- object,
- object.__name__,
- packageContext = self,
- )
- )
- file = open (
- os.path.join(
- self.destinationDirectory,
- self.pending[0] + ".html",
- ),
- 'w',
- )
- file.write(page)
- file.close()
- self.completed[ self.pending[0]] = object
- del self.pending[0]
- finally:
- for item in self.warnings:
- print item
-
- def clean (self, objectList, object):
- """callback from the formatter object asking us to remove
- those items in the key, value pairs where the object is
- imported from one of the excluded modules"""
- for key, value in objectList[:]:
- for excludeObject in self.exclusions.values():
- if hasattr( excludeObject, key ) and excludeObject is not object:
- if (
- getattr( excludeObject, key) is value or
- (hasattr( excludeObject, '__name__') and
- excludeObject.__name__ == "Numeric"
- )
- ):
- objectList[:] = [ (k,o) for k,o in objectList if k != key ]
- def recurseScan(self, objectList):
- """Process the list of modules trying to add each to the
- list of interesting modules"""
- for key, value in objectList:
- self.addInteresting( value.__name__ )
+ def warn(self, message):
+ """Warnings are used for recoverable, but not necessarily ignorable conditions"""
+ self.warnings.append(message)
+ def info(self, message):
+ """Information/status report"""
+ print(message)
+
+ def addBase(self, specifier):
+ """Set the base of the documentation set, only children of these modules will
+ be documented"""
+ try:
+ self.baseSpecifiers[specifier] = pydoc.locate(specifier)
+ self.pending.append(specifier)
+ except pydoc.ErrorDuringImport as value:
+ self.warn(
+ "Unable to import the module %s which was specified as a base module" %
+ (repr(specifier)))
+
+ def addInteresting(self, specifier):
+ """Add a module to the list of interesting modules"""
+ if self.checkScope(specifier):
+ self.pending.append(specifier)
+ else:
+ self.completed[specifier] = 1
+
+ def checkScope(self, specifier):
+ """Check that the specifier is "in scope" for the recursion"""
+ if not self.recursion:
+ return 0
+ items = string.split(specifier, ".")
+ stopCheck = items[:]
+ while stopCheck:
+ name = string.join(items, ".")
+ if self.recursionStops.get(name):
+ return 0
+ elif self.completed.get(name):
+ return 0
+ del stopCheck[-1]
+ while items:
+ if self.baseSpecifiers.get(string.join(items, ".")):
+ return 1
+ del items[-1]
+ # was not within any given scope
+ return 0
+
+ def process(self):
+ """Having added all of the base and/or interesting modules,
+ proceed to generate the appropriate documentation for each
+ module in the appropriate directory, doing the recursion
+ as we go."""
+ try:
+ while self.pending:
+ try:
+ if self.pending[0] in self.completed:
+ raise AlreadyDone(self.pending[0])
+ self.info("""Start %s""" % (repr(self.pending[0])))
+ object = pydoc.locate(self.pending[0])
+ self.info(""" ... found %s""" % (repr(object.__name__)))
+ except AlreadyDone:
+ pass
+ except pydoc.ErrorDuringImport as value:
+ self.info(" ... FAILED %s" % (repr(value)))
+ self.warn("Unable to import the module %s" % (repr(self.pending[0])))
+ except (SystemError, SystemExit) as value:
+ self.info(" ... FAILED %s" % (repr(value)))
+ self.warn("Unable to import the module %s" % (repr(self.pending[0])))
+ except Exception as value:
+ self.info(" ... FAILED %s" % (repr(value)))
+ self.warn("Unable to import the module %s" % (repr(self.pending[0])))
+ else:
+ page = self.formatter.page(
+ pydoc.describe(object),
+ self.formatter.docmodule(
+ object,
+ object.__name__,
+ packageContext=self,
+ )
+ )
+ file = open(
+ os.path.join(
+ self.destinationDirectory,
+ self.pending[0] + ".html",
+ ),
+ 'w',
+ )
+ file.write(page)
+ file.close()
+ self.completed[self.pending[0]] = object
+ del self.pending[0]
+ finally:
+ for item in self.warnings:
+ print(item)
+
+ def clean(self, objectList, object):
+ """callback from the formatter object asking us to remove
+ those items in the key, value pairs where the object is
+ imported from one of the excluded modules"""
+ for key, value in objectList[:]:
+ for excludeObject in self.exclusions.values():
+ if hasattr(excludeObject, key) and excludeObject is not object:
+ if (
+ getattr(excludeObject, key) is value or
+ (hasattr(excludeObject, '__name__') and
+ excludeObject.__name__ == "Numeric"
+ )
+ ):
+ objectList[:] = [(k, o) for k, o in objectList if k != key]
+
+ def recurseScan(self, objectList):
+ """Process the list of modules trying to add each to the
+ list of interesting modules"""
+ for key, value in objectList:
+ self.addInteresting(value.__name__)
if __name__ == "__main__":
- excludes = [
- "OpenGL.GL",
- "OpenGL.GLU",
- "OpenGL.GLUT",
- "OpenGL.GLE",
- "OpenGL.GLX",
- "wxPython.wx",
- "Numeric",
- "_tkinter",
- "Tkinter",
- ]
+ excludes = [
+ "OpenGL.GL",
+ "OpenGL.GLU",
+ "OpenGL.GLUT",
+ "OpenGL.GLE",
+ "OpenGL.GLX",
+ "wxPython.wx",
+ "Numeric",
+ "_tkinter",
+ "Tkinter",
+ ]
- modules = [
- "OpenGLContext.debug",
-## "wxPython.glcanvas",
-## "OpenGL.Tk",
-## "OpenGL",
- ]
- PackageDocumentationGenerator(
- baseModules = modules,
- destinationDirectory = "z:\\temp",
- exclusions = excludes,
- ).process ()
-
+ modules = [
+ "OpenGLContext.debug",
+ ]
+ PackageDocumentationGenerator(
+ baseModules=modules,
+ destinationDirectory="z:\\temp",
+ exclusions=excludes,
+ ).process()
diff --git a/examples/amicommand.py b/examples/amicommand.py
index ee0e7b1..04ac913 100644
--- a/examples/amicommand.py
+++ b/examples/amicommand.py
@@ -1,35 +1,38 @@
#! /usr/bin/env python
"""Test/sample to call "show database" command
"""
-from twisted.application import service, internet
-from twisted.internet import reactor, defer
-from starpy import manager, fastagi
+from twisted.internet import reactor
+from starpy import manager
import utilapplication
-import menu
-import os, logging, pprint, time
+import logging
-log = logging.getLogger( 'callduration' )
+log = logging.getLogger('callduration')
APPLICATION = utilapplication.UtilApplication()
+
def main():
- def onConnect( ami ):
- def onResult( result ):
- print 'Result', result
+
+ def onConnect(ami):
+
+ def onResult(result):
+ print('Result', result)
return ami.logoff()
- def onError( reason ):
- print reason.getTraceback()
+
+ def onError(reason):
+ print(reason.getTraceback())
return reason
- def onFinished( result ):
+
+ def onFinished(result):
reactor.stop()
- df = ami.command( 'database show' )
- df.addCallbacks( onResult, onError )
- df.addCallbacks( onFinished, onFinished )
+ df = ami.command('database show')
+ df.addCallbacks(onResult, onError)
+ df.addCallbacks(onFinished, onFinished)
return df
- amiDF = APPLICATION.amiSpecifier.login(
- ).addCallback( onConnect )
+ APPLICATION.amiSpecifier.login().addCallback(onConnect)
+
if __name__ == "__main__":
logging.basicConfig()
- manager.log.setLevel( logging.DEBUG )
- reactor.callWhenRunning( main )
+ manager.log.setLevel(logging.DEBUG)
+ reactor.callWhenRunning(main)
reactor.run()
diff --git a/examples/autosurvey/frontend.py b/examples/autosurvey/frontend.py
index da50378..7843a34 100644
--- a/examples/autosurvey/frontend.py
+++ b/examples/autosurvey/frontend.py
@@ -1,167 +1,176 @@
"""Simple HTTP Server using twisted.web2"""
-from nevow import rend, appserver, inevow, tags, loaders
-from twisted.application import service, internet
-from twisted.internet import reactor, defer
+from nevow import rend, appserver, inevow, loaders
+from twisted.application import internet
+from twisted.internet import reactor
from starpy import manager, fastagi, utilapplication
from basicproperty import common, basic, propertied, weak
-import os, logging, pprint, time
+import logging
+import random
+import sys
-log = logging.getLogger( 'autosurvey' )
-
-class Application( utilapplication.UtilApplication ):
- """Services provided at the application level"""
- surveys = common.DictionaryProperty(
- "surveys", """Set of surveys indexed by survey/extension number""",
- )
+log = logging.getLogger('autosurvey')
+class Application(utilapplication.UtilApplication):
+ """Services provided at the application level"""
+ surveys = common.DictionaryProperty(
+ "surveys", """Set of surveys indexed by survey/extension number""",
+ )
-class Survey( propertied.Propertied ):
- """Models a single survey to be completed"""
- surveyId = common.IntegerProperty(
- "surveyId", """Unique identifier for this survey""",
- )
- owner = basic.BasicProperty(
- "owner", """Owner's phone number to which to connect""",
- )
- questions = common.ListProperty(
- "questions", """Set of questions which make up the survey""",
- )
- YOU_CURRENTLY_HAVE = 'vm-youhave'
- QUESTIONS_IN_YOUR_SURVEY = 'vm-messages'
- QUESTION_IN_YOUR_SURVEY = 'vm-message'
- TO_LISTEN_TO_SURVEY_QUESTION = 'to-listen-to-it'
- TO_RECORD_A_NEW_SURVEY_QUESTION = 'to-rerecord-it'
- TO_FINISH_SURVEY_SETUP = 'vm-helpexit'
- def setupSurvey( self, agi ):
- """AGI application to allow the user to set up the survey
-
- Screen 1:
- You have # questions.
- To listen to a question, press the number of the question.
- To record a new question, press pound.
- To finish setup, press star.
- """
- seq = fastagi.InSequence( )
- seq.append( agi.wait, 2 )
- base = """You currently have %s question%s.
- To listen to a question press the number of the question.
- To record a new question, press pound.
- To finish survey setup, press star.
- """%(
- len(self.questions),
- ['','s'][len(self.questions)==1],
- )
- if len(base) != 1:
- base += 's'
- base = " ".join(base.split())
- seq.append( agi.execute, 'Festival', base )
- seq.append( agi.finish, )
- return seq()
- seq.append( agi.streamFile, self.YOU_CURRENTLY_HAVE )
- seq.append( agi.sayNumber, len(self.questions))
- if len(self.questions) == 1:
- seq.append( agi.streamFile, self.QUESTION_IN_YOUR_SURVEY )
- else:
- seq.append( agi.streamFile, self.QUESTIONS_IN_YOUR_SURVEY )
- seq.append( agi.streamFile, self.TO_LISTEN_TO_SURVEY_QUESTION )
- seq.append( agi.streamFile, self.TO_RECORD_A_NEW_SURVEY_QUESTION )
- seq.append( agi.streamFile, self.TO_FINISH_SURVEY_SETUP )
- seq.append( agi.finish, )
- return seq()
- def newQuestionId( self ):
- """Return a new, unique, question id"""
- import random, sys
- bad = True
- while bad:
- bad = False
- id = random.randint(0,sys.maxint)
- for question in self.questions:
- if id == question.__dict__.get('questionId'):
- bad = True
- return id
-class Question( propertied.Propertied ):
- survey = weak.WeakProperty(
- "survey", """Our survey object""",
- )
- questionId = common.IntegerProperty(
- "questionId", """Unique identifier for our question""",
- defaultFunction = lambda prop,client: client.survey.newQuestionId(),
- )
- def recordQuestion( self, agi, number=None ):
- """Record a question (number)"""
- return agi.recordFile(
- '%s.%s'%(self.survey.surveyId,self.questionId),
- 'gsm',
- '#*',
- timeout=60,
- beep = True,
- silence=5,
- ).addCallback(
- self.onRecorded, agi=agi
- ).addErrback(self.onRecordAborted, agi=agi )
- def onRecorded( self, result, agi ):
- """Handle recording of the question"""
-
-def getManagerAPI( username, password, server='127.0.0.1', port=5038 ):
- """Retrieve a logged-in manager API"""
+class Survey(propertied.Propertied):
+ """Models a single survey to be completed"""
+ surveyId = common.IntegerProperty(
+ "surveyId", """Unique identifier for this survey""",
+ )
+ owner = basic.BasicProperty(
+ "owner", """Owner's phone number to which to connect""",
+ )
+ questions = common.ListProperty(
+ "questions", """Set of questions which make up the survey""",
+ )
+ YOU_CURRENTLY_HAVE = 'vm-youhave'
+ QUESTIONS_IN_YOUR_SURVEY = 'vm-messages'
+ QUESTION_IN_YOUR_SURVEY = 'vm-message'
+ TO_LISTEN_TO_SURVEY_QUESTION = 'to-listen-to-it'
+ TO_RECORD_A_NEW_SURVEY_QUESTION = 'to-rerecord-it'
+ TO_FINISH_SURVEY_SETUP = 'vm-helpexit'
+
+ def setupSurvey(self, agi):
+ """AGI application to allow the user to set up the survey
+
+ Screen 1:
+ You have # questions.
+ To listen to a question, press the number of the question.
+ To record a new question, press pound.
+ To finish setup, press star.
+ """
+ seq = fastagi.InSequence()
+ seq.append(agi.wait, 2)
+ base = """You currently have %s question%s.
+ To listen to a question press the number of the question.
+ To record a new question, press pound.
+ To finish survey setup, press star.
+ """ % (
+ len(self.questions),
+ ['', 's'][len(self.questions) == 1],
+ )
+ if len(base) != 1:
+ base += 's'
+ base = " ".join(base.split())
+ seq.append(agi.execute, 'Festival', base)
+ seq.append(agi.finish,)
+ return seq()
+ seq.append(agi.streamFile, self.YOU_CURRENTLY_HAVE)
+ seq.append(agi.sayNumber, len(self.questions))
+ if len(self.questions) == 1:
+ seq.append(agi.streamFile, self.QUESTION_IN_YOUR_SURVEY)
+ else:
+ seq.append(agi.streamFile, self.QUESTIONS_IN_YOUR_SURVEY)
+ seq.append(agi.streamFile, self.TO_LISTEN_TO_SURVEY_QUESTION)
+ seq.append(agi.streamFile, self.TO_RECORD_A_NEW_SURVEY_QUESTION)
+ seq.append(agi.streamFile, self.TO_FINISH_SURVEY_SETUP)
+ seq.append(agi.finish,)
+ return seq()
+
+ def newQuestionId(self):
+ """Return a new, unique, question id"""
+ bad = True
+ while bad:
+ bad = False
+ id = random.randint(0, sys.maxint)
+ for question in self.questions:
+ if id == question.__dict__.get('questionId'):
+ bad = True
+ return id
+
+
+class Question(propertied.Propertied):
+ survey = weak.WeakProperty(
+ "survey", """Our survey object""",
+ )
+ questionId = common.IntegerProperty(
+ "questionId", """Unique identifier for our question""",
+ defaultFunction=lambda prop, client: client.survey.newQuestionId(),
+ )
+
+ def recordQuestion(self, agi, number=None):
+ """Record a question (number)"""
+ return agi.recordFile(
+ '%s.%s' % (self.survey.surveyId, self.questionId),
+ 'gsm',
+ '#*',
+ timeout=60,
+ beep=True,
+ silence=5,
+ ).addCallback(
+ self.onRecorded, agi=agi
+ ).addErrback(self.onRecordAborted, agi=agi)
+
+ def onRecorded(self, result, agi):
+ """Handle recording of the question"""
+ pass
+
+
+def getManagerAPI(username, password, server='127.0.0.1', port=5038):
+ """Retrieve a logged-in manager API"""
+
class SurveySetup(rend.Page):
- """Page displaying the survey setup"""
- addSlash = True
- docFactory = loaders.htmlfile( 'index.html' )
-
-class RecordFunction( rend.Page ):
- """Page/application to record survey via call to user"""
- def renderHTTP( self, ctx ):
- """Process rendering of the request"""
- # process request parameters...
- request = inevow.IRequest( ctx )
- # XXX sanitise and check value...
- channel = 'SIP/%s'%( request.args['ownerName'][0], )
-
- df = APPLICATION.amiSpecifier.login()
- def onLogin( ami ):
- # Note that the connect comes in *before* the originate returns,
- # so we need to wait for the call before we even send it...
- userConnectDF = APPLICATION.waitForCallOn( '23', timeout=15 )
- APPLICATION.surveys['23'] = survey = Survey()
- userConnectDF.addCallback(
- survey.setupSurvey,
- )
- def onComplete( result ):
- return ami.logoff()
- ami.originate(# don't wait for this to complete...
- # XXX handle case where the originate fails differently
- # from the case where we just don't get a connection?
- channel,
- APPLICATION.agiSpecifier.context,
- '23',
- '1',
- timeout=14,
- ).addCallbacks( onComplete, onComplete )
- return userConnectDF
- return df.addCallback( onLogin )
+ """Page displaying the survey setup"""
+ addSlash = True
+ docFactory = loaders.htmlfile('index.html')
+class RecordFunction(rend.Page):
+ """Page/application to record survey via call to user"""
+ def renderHTTP(self, ctx):
+ """Process rendering of the request"""
+ # process request parameters...
+ request = inevow.IRequest(ctx)
+ # XXX sanitise and check value...
+ channel = 'SIP/%s' % (request.args['ownerName'][0],)
+
+ df = APPLICATION.amiSpecifier.login()
+
+ def onLogin(ami):
+ # Note that the connect comes in *before* the originate returns,
+ # so we need to wait for the call before we even send it...
+ userConnectDF = APPLICATION.waitForCallOn('23', timeout=15)
+ APPLICATION.surveys['23'] = survey = Survey()
+ userConnectDF.addCallback(survey.setupSurvey,)
+
+ def onComplete(result):
+ return ami.logoff()
+ ami.originate(
+ # don't wait for this to complete...
+ # XXX handle case where the originate fails differently
+ # from the case where we just don't get a connection?
+ channel,
+ APPLICATION.agiSpecifier.context,
+ '23',
+ '1',
+ timeout=14,
+ ).addCallbacks(onComplete, onComplete)
+ return userConnectDF
+ return df.addCallback(onLogin)
def main():
- """Create the web-site"""
- s = SurveySetup()
- s.putChild( 'record', RecordFunction() )
- site = appserver.NevowSite(s)
- webServer = internet.TCPServer(8080, site)
- webServer.startService()
+ """Create the web-site"""
+ s = SurveySetup()
+ s.putChild('record', RecordFunction())
+ site = appserver.NevowSite(s)
+ webServer = internet.TCPServer(8080, site)
+ webServer.startService()
+
if __name__ == "__main__":
- logging.basicConfig()
- log.setLevel( logging.DEBUG )
- manager.log.setLevel( logging.DEBUG )
- fastagi.log.setLevel( logging.DEBUG )
- APPLICATION = Application()
- APPLICATION.agiSpecifier.run( APPLICATION.dispatchIncomingCall )
- from twisted.internet import reactor
- reactor.callWhenRunning( main )
- reactor.run()
+ logging.basicConfig()
+ log.setLevel(logging.DEBUG)
+ manager.log.setLevel(logging.DEBUG)
+ fastagi.log.setLevel(logging.DEBUG)
+ APPLICATION = Application()
+ APPLICATION.agiSpecifier.run(APPLICATION.dispatchIncomingCall)
+ reactor.callWhenRunning(main)
+ reactor.run()
diff --git a/examples/calldurationcallback.py b/examples/calldurationcallback.py
index 00c8260..6ed6cc3 100644
--- a/examples/calldurationcallback.py
+++ b/examples/calldurationcallback.py
@@ -6,26 +6,29 @@
for the end of the call, then call them back with the
duration message.
"""
-from twisted.application import service, internet
from twisted.internet import reactor, defer
-from starpy import manager, fastagi
+from starpy import fastagi
import utilapplication
import menu
-import os, logging, pprint, time
+import logging
+import time
-log = logging.getLogger( 'callduration' )
+log = logging.getLogger('callduration')
-class Application( utilapplication.UtilApplication ):
+
+class Application(utilapplication.UtilApplication):
"""Application for the call duration callback mechanism"""
- def onS( self, agi ):
+
+ def onS(self, agi):
"""Incoming AGI connection to the "s" extension (start operation)"""
- log.info( """New call tracker""" )
+ log.info("""New call tracker""")
c = CallTracker()
- return c.recordChannelInfo( agi ).addErrback(
+ return c.recordChannelInfo(agi).addErrback(
agi.jumpOnError, difference=100,
)
-class CallTracker( object ):
+
+class CallTracker(object):
"""Object which tracks duration of a single call
This object encapsulates the entire interaction with the user, from
@@ -37,7 +40,8 @@
as all numeric extensions.
"""
ourContext = 'callduration'
- def __init__( self ):
+
+ def __init__(self):
"""Initialise the tracker object"""
self.uniqueChannelId = None
self.currentChannel = None
@@ -47,135 +51,151 @@
self.ami = None
self.startTime = None
self.stopTime = None
- def recordChannelInfo( self, agi ):
+
+ def recordChannelInfo(self, agi):
"""Records relevant channel information, creates manager watcher"""
self.uniqueChannelId = agi.variables['agi_uniqueid']
self.currentChannel = currentChannel = agi.variables['agi_channel']
- # XXX everything up to the last - is normally our local caller's "address"
+ # XXX everything up to the last - normally our local caller's "address"
# this is not, however, a great way to decide who to call back...
- self.callbackChannel = currentChannel.rsplit( '-', 1)[0]
+ self.callbackChannel = currentChannel.rsplit('-', 1)[0]
# Ask user for the account number...
df = menu.CollectDigits(
- soundFile = 'your-account',
- maxDigits = 7,
- minDigits = 3,
- timeout = 5,
- )( agi ).addCallback(
- self.onAccountInput,agi=agi,
+ soundFile='your-account',
+ maxDigits=7,
+ minDigits=3,
+ timeout=5,
+ )(agi).addCallback(
+ self.onAccountInput, agi=agi,
)
# XXX handle AMI login failure...
amiDF = APPLICATION.amiSpecifier.login(
- ).addCallback( self.onAMIConnect )
- dl = defer.DeferredList( [df, amiDF] )
- return dl.addCallback( self.onConnectAndAccount )
- def onAccountInput( self, result, agi, retries=2):
+ ).addCallback(self.onAMIConnect)
+ dl = defer.DeferredList([df, amiDF])
+ return dl.addCallback(self.onConnectAndAccount)
+
+ def onAccountInput(self, result, agi, retries=2):
"""Allow user to enter again if timed out"""
self.account = result[0][1]
self.startTime = time.time()
- agi.finish() # let the user go about their business...
+ agi.finish() # let the user go about their business...
return agi
- def cleanUp( self, agi=None ):
+
+ def cleanUp(self, agi=None):
"""Cleanup on error as much as possible"""
items = []
if self.ami:
- items.append( self.ami.logoff())
+ items.append(self.ami.logoff())
self.ami = None
if items:
- return defer.DeferredList( items )
+ return defer.DeferredList(items)
else:
- return defer.succeed( False )
- def onAMIConnect( self, ami ):
+ return defer.succeed(False)
+
+ def onAMIConnect(self, ami):
"""We have successfully connected to the AMI"""
- log.debug( "AMI login complete" )
+ log.debug("AMI login complete")
if not self.cancelled:
self.ami = ami
return ami
else:
return self.ami.logoff()
- def onConnectAndAccount( self, results ):
+
+ def onConnectAndAccount(self, results):
"""We have connected and retrieved an account"""
- log.info( """AMI Connected and account information gathered: %s""", self.uniqueChannelId )
+ log.info("""AMI Connected and account information gathered: %s""",
+ self.uniqueChannelId)
df = defer.Deferred()
- def onChannelHangup( ami, event ):
+
+ def onChannelHangup(ami, event):
"""Deal with the hangup of an event"""
if event['uniqueid'] == self.uniqueChannelId:
- log.info( """AMI Detected close of our channel: %s""", self.uniqueChannelId )
+ log.info("""AMI Detected close of our channel: %s""",
+ self.uniqueChannelId)
self.stopTime = time.time()
# give the user a few seconds to put down the hand-set
- reactor.callLater( 2, df.callback, event )
- self.ami.deregisterEvent( 'Hangup', onChannelHangup )
- log.debug( 'event:', event )
+ reactor.callLater(2, df.callback, event)
+ self.ami.deregisterEvent('Hangup', onChannelHangup)
+ log.debug('event:', event)
if not self.cancelled:
- self.ami.registerEvent( 'Hangup', onChannelHangup )
- return df.addCallback( self.onHangup, callbacks=5 )
- def onHangup( self, event, callbacks=5 ):
+ self.ami.registerEvent('Hangup', onChannelHangup)
+ return df.addCallback(self.onHangup, callbacks=5)
+
+ def onHangup(self, event, callbacks=5):
"""Okay, the call is finished, time to inform the user"""
- log.debug( 'onHangup %s %s', event, callbacks )
- def ignoreResult( result ):
+ log.debug('onHangup %s %s', event, callbacks)
+
+ def ignoreResult(result):
"""Since we're using an equal timeout waiting for a connect
we don't care *how* this fails/succeeds"""
pass
self.ami.originate(
self.callbackChannel,
self.ourContext, id(self), 1,
- timeout = 15,
- ).addCallbacks( ignoreResult, ignoreResult )
- df = APPLICATION.waitForCallOn( id(self), 15 )
+ timeout=15,
+ ).addCallbacks(ignoreResult, ignoreResult)
+ df = APPLICATION.waitForCallOn(id(self), 15)
df.addCallbacks(
self.onUserReconnected, self.onUserReconnectFail,
- errbackKeywords = { 'event': event, 'callbacks': callbacks-1 },
+ errbackKeywords={'event': event, 'callbacks': callbacks-1},
)
- def onUserReconnectFail( self, reason, event, callbacks ):
+
+ def onUserReconnectFail(self, reason, event, callbacks):
"""Wait for bit, then retry..."""
if callbacks:
# XXX really want something like a decaying back-off in frequency
# with final values of e.g. an hour...
- log.info( """Failure connecting: will retry in 30 seconds""" )
- reactor.callLater( 30, self.onHangup, event, callbacks )
+ log.info("""Failure connecting: will retry in 30 seconds""")
+ reactor.callLater(30, self.onHangup, event, callbacks)
else:
- log.error( """Unable to connect to user, giving up""" )
- return self.cleanUp( None )
- def onUserReconnected( self, agi ):
+ log.error("""Unable to connect to user, giving up""")
+ return self.cleanUp(None)
+
+ def onUserReconnected(self, agi):
"""Handle the user interaction after they've re-connected"""
- log.info( """Connection re-established with the user""" )
+ log.info("""Connection re-established with the user""")
# XXX should handle unexpected failures in here...
delta = self.stopTime - self.startTime
- minutes, seconds = divmod( delta, 60 )
+ minutes, seconds = divmod(delta, 60)
seconds = int(seconds)
- hours, minutes = divmod( minutes, 60 )
+ hours, minutes = divmod(minutes, 60)
duration = []
if hours:
- duration.append( '%s hour%s'%(hours,['','s'][hours!=1]))
+ duration.append('%s hour%s' % (hours, ['', 's'][hours != 1]))
if minutes:
- duration.append( '%s second%s'%(minutes,['','s'][minutes!=1]))
+ duration.append('%s second%s' % (minutes, ['', 's'][minutes != 1]))
if seconds:
- duration.append( '%s second%s'%(seconds,['','s'][seconds!=1]))
+ duration.append('%s second%s' % (seconds, ['', 's'][seconds != 1]))
if not duration:
duration = '0'
else:
- duration = " ".join( duration )
- seq = fastagi.InSequence( )
- seq.append( agi.wait, 1 )
- seq.append( agi.execute, "Festival", "Call to account %r took %s"%(self.account,duration) )
- seq.append( agi.wait, 1 )
- seq.append( agi.execute, "Festival", "Repeating, call to account %r took %s"%(self.account,duration) )
- seq.append( agi.wait, 1 )
- seq.append( agi.finish )
- def logSuccess( ):
- log.debug( """Finished successfully!""" )
- return defer.succeed( True )
- seq.append( logSuccess )
- seq.append( self.cleanUp, agi )
+ duration = " ".join(duration)
+ seq = fastagi.InSequence()
+ seq.append(agi.wait, 1)
+ seq.append(agi.execute, "Festival", "Call to account %r took %s" % (
+ self.account, duration))
+ seq.append(agi.wait, 1)
+ seq.append(agi.execute, "Festival",
+ "Repeating, call to account %r took %s" % (
+ self.account, duration))
+ seq.append(agi.wait, 1)
+ seq.append(agi.finish)
+
+ def logSuccess():
+ log.debug("""Finished successfully!""")
+ return defer.succeed(True)
+ seq.append(logSuccess)
+ seq.append(self.cleanUp, agi)
return seq()
+
APPLICATION = Application()
if __name__ == "__main__":
logging.basicConfig()
- log.setLevel( logging.DEBUG )
- #manager.log.setLevel( logging.DEBUG )
- #fastagi.log.setLevel( logging.DEBUG )
- APPLICATION.handleCallsFor( 's', APPLICATION.onS )
- APPLICATION.agiSpecifier.run( APPLICATION.dispatchIncomingCall )
- from twisted.internet import reactor
+ log.setLevel(logging.DEBUG)
+ # manager.log.setLevel(logging.DEBUG)
+ # fastagi.log.setLevel(logging.DEBUG)
+ APPLICATION.handleCallsFor('s', APPLICATION.onS)
+ APPLICATION.agiSpecifier.run(APPLICATION.dispatchIncomingCall)
reactor.run()
diff --git a/examples/connecttoivr.py b/examples/connecttoivr.py
index 24dcf63..3790390 100644
--- a/examples/connecttoivr.py
+++ b/examples/connecttoivr.py
@@ -1,38 +1,46 @@
"""Example script to generate a call to connect a remote channel to an IVR"""
from starpy import manager
from twisted.internet import reactor
-import sys, logging
+import sys
+import logging
-def main( channel = 'sip/20035 at aci.on.ca', connectTo=('outgoing','s','1') ):
- f = manager.AMIFactory(sys.argv[1], sys.argv[2])
- df = f.login()
- def onLogin( protocol ):
- """On Login, attempt to originate the call"""
- context, extension, priority = connectTo
- df = protocol.originate(
- channel,
- context,extension,priority,
- )
- def onFinished( result ):
- df = protocol.logoff()
- def onLogoff( result ):
- reactor.stop()
- return df.addCallbacks( onLogoff, onLogoff )
- def onFailure( reason ):
- print reason.getTraceback()
- return reason
- df.addErrback( onFailure )
- df.addCallbacks( onFinished, onFinished )
- return df
- def onFailure( reason ):
- """Unable to log in!"""
- print reason.getTraceback()
- reactor.stop()
- df.addCallbacks( onLogin, onFailure )
- return df
+
+def main(channel='sip/20035 at aci.on.ca', connectTo=('outgoing', 's', '1')):
+ f = manager.AMIFactory(sys.argv[1], sys.argv[2])
+ df = f.login()
+
+ def onLogin(protocol):
+ """On Login, attempt to originate the call"""
+ context, extension, priority = connectTo
+ df = protocol.originate(
+ channel,
+ context, extension, priority,
+ )
+
+ def onFinished(result):
+ df = protocol.logoff()
+
+ def onLogoff(result):
+ reactor.stop()
+ return df.addCallbacks(onLogoff, onLogoff)
+
+ def onFailure(reason):
+ print(reason.getTraceback())
+ return reason
+ df.addErrback(onFailure)
+ df.addCallbacks(onFinished, onFinished)
+ return df
+
+ def onFailure(reason):
+ """Unable to log in!"""
+ print(reason.getTraceback())
+ reactor.stop()
+ df.addCallbacks(onLogin, onFailure)
+ return df
+
if __name__ == "__main__":
- manager.log.setLevel( logging.DEBUG )
- logging.basicConfig()
- reactor.callWhenRunning( main )
- reactor.run()
+ manager.log.setLevel(logging.DEBUG)
+ logging.basicConfig()
+ reactor.callWhenRunning(main)
+ reactor.run()
diff --git a/examples/connecttoivrapp.py b/examples/connecttoivrapp.py
index aeed61d..829ab66 100644
--- a/examples/connecttoivrapp.py
+++ b/examples/connecttoivrapp.py
@@ -3,35 +3,41 @@
This version of the script uses the utilapplication framework and is
pared down for presentation on a series of slides
"""
-from starpy import manager
import utilapplication
from twisted.internet import reactor
-import sys, logging
+import logging
+
APPLICATION = utilapplication.UtilApplication()
-def main( channel = 'sip/4167290048 at testout', connectTo=('outgoing','s','1') ):
- df = APPLICATION.amiSpecifier.login()
- def onLogin( protocol ):
- """We've logged into the manager, generate a call and log off"""
- context, extension, priority = connectTo
- df = protocol.originate(
- channel,
- context,extension,priority,
- )
- def onFinished( result ):
- return protocol.logoff()
- df.addCallbacks( onFinished, onFinished )
- return df
- def onFailure( reason ):
- print reason.getTraceback()
- def onFinished( result ):
- reactor.stop()
- df.addCallbacks(
- onLogin, onFailure
- ).addCallbacks( onFinished, onFinished )
- return df
+
+def main(channel='sip/4167290048 at testout', connectTo=('outgoing', 's', '1')):
+ df = APPLICATION.amiSpecifier.login()
+
+ def onLogin(protocol):
+ """We've logged into the manager, generate a call and log off"""
+ context, extension, priority = connectTo
+ df = protocol.originate(
+ channel,
+ context, extension, priority,
+ )
+
+ def onFinished(result):
+ return protocol.logoff()
+ df.addCallbacks(onFinished, onFinished)
+ return df
+
+ def onFailure(reason):
+ print(reason.getTraceback())
+
+ def onFinished(result):
+ reactor.stop()
+ df.addCallbacks(
+ onLogin, onFailure
+ ).addCallbacks(onFinished, onFinished)
+ return df
+
if __name__ == "__main__":
- logging.basicConfig()
- reactor.callWhenRunning( main )
- reactor.run()
+ logging.basicConfig()
+ reactor.callWhenRunning(main)
+ reactor.run()
diff --git a/examples/fastagisetvariable.py b/examples/fastagisetvariable.py
index 44c3da7..3a4b774 100644
--- a/examples/fastagisetvariable.py
+++ b/examples/fastagisetvariable.py
@@ -3,26 +3,31 @@
from twisted.internet import reactor
from starpy import fastagi
import utilapplication
-import logging, time
+import logging
-log = logging.getLogger( 'hellofastagi' )
+log = logging.getLogger('hellofastagi')
-def testFunction( agi ):
- """Demonstrate simplistic use of the AGI interface with sequence of actions"""
- log.debug( 'testFunction' )
- def setX( ):
- return agi.setVariable( 'this"toset', 'That"2set' )
- def getX( result ):
- return agi.getVariable( 'this"toset' )
- def onX( value ):
- print 'Retrieved value', value
- reactor.stop()
- return setX().addCallback( getX ).addCallbacks( onX, onX )
+
+def testFunction(agi):
+ """Demonstrate simple use of the AGI interface with sequence of actions"""
+ log.debug('testFunction')
+
+ def setX():
+ return agi.setVariable('this"toset', 'That"2set')
+
+ def getX(result):
+ return agi.getVariable('this"toset')
+
+ def onX(value):
+ print('Retrieved value', value)
+ reactor.stop()
+ return setX().addCallback(getX).addCallbacks(onX, onX)
+
if __name__ == "__main__":
- logging.basicConfig()
- fastagi.log.setLevel( logging.DEBUG )
- APPLICATION = utilapplication.UtilApplication()
- APPLICATION.handleCallsFor( 's', testFunction )
- APPLICATION.agiSpecifier.run( APPLICATION.dispatchIncomingCall )
- reactor.run()
+ logging.basicConfig()
+ fastagi.log.setLevel(logging.DEBUG)
+ APPLICATION = utilapplication.UtilApplication()
+ APPLICATION.handleCallsFor('s', testFunction)
+ APPLICATION.agiSpecifier.run(APPLICATION.dispatchIncomingCall)
+ reactor.run()
diff --git a/examples/getvariable.py b/examples/getvariable.py
index 6929240..43a8e60 100644
--- a/examples/getvariable.py
+++ b/examples/getvariable.py
@@ -4,56 +4,63 @@
from twisted.internet import reactor
from starpy import fastagi
import utilapplication
-import logging, time, pprint
+import logging
+import pprint
-log = logging.getLogger( 'hellofastagi' )
+log = logging.getLogger('hellofastagi')
-def envVars( agi ):
- """Print out channel variables for display"""
- vars = [
- x.split( ' -- ' )[0].strip()
- for x in agi.getVariable.__doc__.splitlines()
- if len(x.split( ' -- ' )) == 2
- ]
- for var in vars:
- yield var
-def printVar( result, agi, vars ):
- """Print out the variables produced by envVars"""
- def doPrint( result, var ):
- print '%r -- %r'%( var, result )
- def notAvailable( reason, var ):
- print '%r -- UNDEFINED'%( var, )
- try:
- var = vars.next()
- except StopIteration, err:
- return None
- else:
- return agi.getVariable( var ).addCallback( doPrint, var ).addErrback(
- notAvailable, var,
- ).addCallback(
- printVar, agi, vars,
- )
-
+def envVars(agi):
+ """Print out channel variables for display"""
+ vars = [
+ x.split(' -- ')[0].strip()
+ for x in agi.getVariable.__doc__.splitlines()
+ if len(x.split(' -- ')) == 2
+ ]
+ for var in vars:
+ yield var
-def testFunction( agi ):
- """Print out known AGI variables"""
- log.debug( 'testFunction' )
- print 'AGI Variables'
- pprint.pprint( agi.variables )
- print 'Channel Variables'
- sequence = fastagi.InSequence()
- sequence.append( printVar, None, agi, envVars(agi) )
- sequence.append( agi.finish )
- def onFailure( reason ):
- log.error( "Failure: %s", reason.getTraceback())
- agi.finish()
- return sequence().addErrback( onFailure )
+
+def printVar(result, agi, vars):
+ """Print out the variables produced by envVars"""
+
+ def doPrint(result, var):
+ print('%r -- %r' % (var, result))
+
+ def notAvailable(reason, var):
+ print('%r -- UNDEFINED' % (var,))
+ try:
+ var = vars.next()
+ except StopIteration as err:
+ return None
+ else:
+ return agi.getVariable(var).addCallback(doPrint, var).addErrback(
+ notAvailable, var,
+ ).addCallback(
+ printVar, agi, vars,
+ )
+
+
+def testFunction(agi):
+ """Print out known AGI variables"""
+ log.debug('testFunction')
+ print('AGI Variables')
+ pprint.pprint(agi.variables)
+ print('Channel Variables')
+ sequence = fastagi.InSequence()
+ sequence.append(printVar, None, agi, envVars(agi))
+ sequence.append(agi.finish)
+
+ def onFailure(reason):
+ log.error("Failure: %s", reason.getTraceback())
+ agi.finish()
+ return sequence().addErrback(onFailure)
+
if __name__ == "__main__":
- logging.basicConfig()
- #fastagi.log.setLevel( logging.DEBUG )
- APPLICATION = utilapplication.UtilApplication()
- APPLICATION.handleCallsFor( 's', testFunction )
- APPLICATION.agiSpecifier.run( APPLICATION.dispatchIncomingCall )
- reactor.run()
+ logging.basicConfig()
+ # fastagi.log.setLevel(logging.DEBUG)
+ APPLICATION = utilapplication.UtilApplication()
+ APPLICATION.handleCallsFor('s', testFunction)
+ APPLICATION.agiSpecifier.run(APPLICATION.dispatchIncomingCall)
+ reactor.run()
diff --git a/examples/hellofastagi.py b/examples/hellofastagi.py
index 1434dad..f491ca4 100644
--- a/examples/hellofastagi.py
+++ b/examples/hellofastagi.py
@@ -2,24 +2,30 @@
"""Simple FastAGI server using starpy"""
from twisted.internet import reactor
from starpy import fastagi
-import logging, time
+import logging
+import time
-log = logging.getLogger( 'hellofastagi' )
-def testFunction( agi ):
- """Demonstrate simplistic use of the AGI interface with sequence of actions"""
- log.debug( 'testFunction' )
- sequence = fastagi.InSequence()
- sequence.append( agi.sayDateTime, time.time() )
- sequence.append( agi.finish )
- def onFailure( reason ):
- log.error( "Failure: %s", reason.getTraceback())
- agi.finish()
- return sequence().addErrback( onFailure )
+log = logging.getLogger('hellofastagi')
+
+
+def testFunction(agi):
+ """Demonstrate simple use of the AGI interface with sequence of actions"""
+ log.debug('testFunction')
+ sequence = fastagi.InSequence()
+ sequence.append(agi.sayDateTime, time.time())
+ sequence.append(agi.finish)
+
+ def onFailure(reason):
+ log.error("Failure: %s", reason.getTraceback())
+ agi.finish()
+ return sequence().addErrback(onFailure)
+
if __name__ == "__main__":
- logging.basicConfig()
- fastagi.log.setLevel( logging.DEBUG )
- f = fastagi.FastAGIFactory(testFunction)
- reactor.listenTCP(4573, f, 50, '127.0.0.1') # only binding on local interface
- reactor.run()
+ logging.basicConfig()
+ fastagi.log.setLevel(logging.DEBUG)
+ f = fastagi.FastAGIFactory(testFunction)
+ # only binding on local interface
+ reactor.listenTCP(4573, f, 50, '127.0.0.1')
+ reactor.run()
diff --git a/examples/hellofastagiapp.py b/examples/hellofastagiapp.py
index 51bc06c..6ed80be 100644
--- a/examples/hellofastagiapp.py
+++ b/examples/hellofastagiapp.py
@@ -7,25 +7,29 @@
from twisted.internet import reactor
from starpy import fastagi
import utilapplication
-import logging, time
+import logging
+import time
-log = logging.getLogger( 'hellofastagi' )
+log = logging.getLogger('hellofastagi')
-def testFunction( agi ):
- """Demonstrate simplistic use of the AGI interface with sequence of actions"""
- log.debug( 'testFunction' )
- sequence = fastagi.InSequence()
- sequence.append( agi.sayDateTime, time.time() )
- sequence.append( agi.finish )
- def onFailure( reason ):
- log.error( "Failure: %s", reason.getTraceback())
- agi.finish()
- return sequence().addErrback( onFailure )
+
+def testFunction(agi):
+ """Demonstrate simple use of the AGI interface with sequence of actions"""
+ log.debug('testFunction')
+ sequence = fastagi.InSequence()
+ sequence.append(agi.sayDateTime, time.time())
+ sequence.append(agi.finish)
+
+ def onFailure(reason):
+ log.error("Failure: %s", reason.getTraceback())
+ agi.finish()
+ return sequence().addErrback(onFailure)
+
if __name__ == "__main__":
- logging.basicConfig()
- fastagi.log.setLevel( logging.DEBUG )
- APPLICATION = utilapplication.UtilApplication()
- APPLICATION.handleCallsFor( 's', testFunction )
- APPLICATION.agiSpecifier.run( APPLICATION.dispatchIncomingCall )
- reactor.run()
+ logging.basicConfig()
+ fastagi.log.setLevel(logging.DEBUG)
+ APPLICATION = utilapplication.UtilApplication()
+ APPLICATION.handleCallsFor('s', testFunction)
+ APPLICATION.agiSpecifier.run(APPLICATION.dispatchIncomingCall)
+ reactor.run()
diff --git a/examples/menu.py b/examples/menu.py
index a847c2d..847e255 100644
--- a/examples/menu.py
+++ b/examples/menu.py
@@ -1,6 +1,6 @@
#
# StarPy -- Asterisk Protocols for Twisted
-#
+#
# Copyright (c) 2006, Michael C. Fletcher
#
# Michael C. Fletcher <mcfletch at vrplumber.com>
@@ -29,34 +29,38 @@
XXX add the reject/accept menus to the CollectDigits (requires soundfiles
in standard locations on the server, complicates install)
"""
-from twisted.application import service, internet
-from twisted.internet import reactor, defer
-from starpy import manager, fastagi, error
-import utilapplication
-import os, logging, pprint, time
+from twisted.internet import defer
+from starpy import error
+import os
+import sys
+import logging
from basicproperty import common, propertied, basic
log = logging.getLogger('menu')
log.setLevel(logging.DEBUG)
+
class Interaction(propertied.Propertied):
"""Base class for user-interaction operations"""
ALL_DIGITS = '0123456789*#'
timeout = common.FloatProperty(
"timeout", """Duration to wait for response before repeating message""",
- defaultValue = 5,
+ defaultValue=5,
)
maxRepetitions = common.IntegerProperty(
"maxRepetitions", """Maximum number of times to play before failure""",
- defaultValue = 5,
+ defaultValue=5,
)
onSuccess = basic.BasicProperty(
- "onSuccess", """Optional callback for success with signature method( result, runner )""",
+ "onSuccess",
+ """Optional callback for success with signature method( result, runner )""",
)
onFailure = basic.BasicProperty(
- "onFailure", """Optional callback for failure with signature method( result, runner )""",
+ "onFailure",
+ """Optional callback for failure with signature method( result, runner )""",
)
runnerClass = None
+
def __call__(self, agi, *args, **named):
"""Initiate AGI-based interaction with the user"""
return self.runnerClass(model=self, agi=agi)(*args, **named)
@@ -67,6 +71,7 @@
agi = basic.BasicProperty(
"agi", """The AGI instance we use to communicate with the user""",
)
+
def defaultFinalDF(prop, client):
"""Produce the default finalDF with onSuccess/onFailure support"""
df = defer.Deferred()
@@ -80,13 +85,13 @@
return df
finalDF = basic.BasicProperty(
"finalDF", """Final deferred we will callback/errback on success/failure""",
- defaultFunction = defaultFinalDF,
+ defaultFunction=defaultFinalDF,
)
del defaultFinalDF
alreadyRepeated = common.IntegerProperty(
"alreadyRepeated", """Number of times we've repeated the message...""",
- defaultValue = 0,
+ defaultValue=0,
)
model = basic.BasicProperty(
"model", """The data-model that we are presenting to the user (e.g. Menu)""",
@@ -94,7 +99,7 @@
def returnResult(self, result):
"""Return result of deferred to our original caller"""
- log.debug('returnResult: %s %s', self.model,result)
+ log.debug('returnResult: %s %s', self.model, result)
if not self.finalDF.called:
self.finalDF.debug = True
self.finalDF.callback(result)
@@ -117,20 +122,23 @@
"""Take set of prompt-compatible objects and produce a PromptRunner for them"""
realPrompt = []
for p in prompt:
- if isinstance(p, (str, unicode)):
+ if isinstance(p, str):
+ p = AudioPrompt(p)
+ elif sys.version_info <= (3, 0) and isinstance(p, unicode): # noqa
p = AudioPrompt(p)
elif isinstance(p, int):
p = NumberPrompt(p)
elif not isinstance(p, Prompt):
- raise TypeError( """Unknown prompt element type on %r: %s"""%(
- p, p.__class__,
- ))
+ raise TypeError(
+ """Unknown prompt element type on %r: %s""" % (
+ p, p.__class__, )
+ )
realPrompt.append(p)
return PromptRunner(
- elements = realPrompt,
- escapeDigits = self.escapeDigits,
- agi = self.agi,
- timeout = self.model.timeout,
+ elements=realPrompt,
+ escapeDigits=self.escapeDigits,
+ agi=self.agi,
+ timeout=self.model.timeout,
)
@@ -151,7 +159,7 @@
# easiest possibility, just read out the file...
return self.agi.getData(
soundFile, timeout=self.model.timeout,
- maxDigits = getattr(self.model, 'maxDigits', None),
+ maxDigits=getattr(self.model, 'maxDigits', None),
).addCallback(self.onReadDigits).addErrback(self.returnError)
else:
raise NotImplemented("""Haven't got non-soundfile menus working yet""")
@@ -166,8 +174,9 @@
return False, 'Too few digits'
return True, None
- def onReadDigits(self, (digits,timeout)):
+ def onReadDigits(self, values):
"""Deal with succesful result from reading digits"""
+ (digits, timeout) = values
log.info("""onReadDigits: %r, %s""", digits, timeout)
valid, reason = self.validEntry(digits)
if (not digits) and (not timeout):
@@ -183,7 +192,8 @@
pass
self.alreadyRepeated += 1
if self.alreadyRepeated >= self.model.maxRepetitions:
- log.warn("""User did not complete digit-entry for %s, timing out""", self.model)
+ log.warn("""User did not complete digit-entry for %s, timing out""",
+ self.model)
raise error.MenuTimeout(
self.model,
"""User did not finish digit-entry in %s passes of collection""" % (
@@ -201,6 +211,7 @@
expected = common.StringLocaleProperty(
"expected", """The value expected/required from the user for this run""",
)
+
def __call__(self, expected, *args, **named):
"""Begin the AGI processing for the menu"""
self.expected = expected
@@ -223,9 +234,10 @@
"""Audio-collection runner, records user audio to a file on the asterisk server"""
escapeDigits = common.StringLocaleProperty(
"escapeDigits", """Set of digits which escape from recording""",
- defaultFunction = lambda prop, client: client.model.escapeDigits,
- setDefaultOnGet = False,
+ defaultFunction=lambda prop, client: client.model.escapeDigits,
+ setDefaultOnGet=False,
)
+
def __call__(self, *args, **named):
"""Begin the AGI processing for the menu"""
self.readPrompt()
@@ -252,7 +264,7 @@
else:
return self.collectAudio()
- def collectAudio( self ):
+ def collectAudio(self):
"""We're supposed to record audio from the user with our model's parameters"""
# XXX use a temporary file for recording the audio, then move to final destination
log.debug('collectAudio')
@@ -280,8 +292,8 @@
digits, typeOfExit, endpos = result
if typeOfExit in ('hangup', 'timeout'):
# expected common-case for recording...
- return self.returnResult((self,(digits,typeOfExit,endpos)))
- elif typeOfExit =='dtmf':
+ return self.returnResult((self, (digits, typeOfExit, endpos)))
+ elif typeOfExit == 'dtmf':
raise error.MenuExit(
self.model,
"""User cancelled entry of audio""",
@@ -297,7 +309,7 @@
"""Failure collecting audio for CollectAudio instance %s: %s""",
self.model, reason.getTraceback(),
)
- return reason # re-raise the error...
+ return reason # re-raise the error...
def moveToFinal(self, result):
"""On succesful recording, move temporaryFile to final file"""
@@ -305,13 +317,12 @@
'Moving recorded audio %r to final destination %r',
self.model.temporaryFile, self.model.filename
)
- import os
try:
os.rename(
'%s.%s' % (self.model.temporaryFile, self.model.format),
'%s.%s' % (self.model.filename, self.model.format),
)
- except (OSError, IOError), err:
+ except (OSError, IOError) as err:
log.error(
"""Unable to move temporary recording file %r to target file %r: %s""",
self.model.temporaryFile, self.model.filename,
@@ -324,6 +335,7 @@
class MenuRunner(Runner):
"""User's single interaction with a given menu"""
+
def defaultEscapeDigits(prop, client):
"""Return the default escape digits for the given client"""
if client.model.tellInvalid:
@@ -333,9 +345,9 @@
return escapeDigits
escapeDigits = common.StringLocaleProperty(
"escapeDigits", """Set of digits which escape from prompts to choose option""",
- defaultFunction = defaultEscapeDigits,
+ defaultFunction=defaultEscapeDigits,
)
- del defaultEscapeDigits # clean up namespace
+ del defaultEscapeDigits # clean up namespace
def __call__(self, *args, **named):
"""Begin the AGI processing for the menu"""
@@ -353,7 +365,8 @@
if not pressed:
self.alreadyRepeated += 1
if self.alreadyRepeated >= self.model.maxRepetitions:
- log.warn("""User did not complete menu selection for %s, timing out""", self.model)
+ log.warn("""User did not complete menu selection for %s, timing out""",
+ self.model)
if not self.finalDF.called:
raise error.MenuTimeout(
self.model,
@@ -374,15 +387,16 @@
self.returnResult, self.returnError
)
elif hasattr(option, 'onSuccess'):
- return defer.maybeDeferred(option.onSuccess, pressed, self).addCallbacks(
- self.returnResult, self.returnError
- )
+ return defer.maybeDeferred(option.onSuccess, pressed, self) \
+ .addCallbacks(self.returnResult, self.returnError)
else:
- return self.returnResult([(option,pressed),])
+ return self.returnResult([(option, pressed), ])
# but it wasn't anything we expected...
if not self.model.tellInvalid:
raise error.MenuUnexpectedOption(
- self.model, """User somehow selected %r, which isn't a recognised option?""" % (pressed,),
+ self.model,
+ "User somehow selected %r, which isn't a recognised option?" % (
+ pressed,),
)
else:
return self.agi.getOption(
@@ -424,8 +438,9 @@
"options", """Set of options the user may select""",
)
tellInvalid = common.IntegerProperty(
- "tellInvalid", """Whether to tell the user that their selection is unrecognised""",
- defaultValue = True,
+ "tellInvalid",
+ """Whether to tell the user that their selection is unrecognised""",
+ defaultValue=True,
)
runnerClass = MenuRunner
@@ -442,11 +457,12 @@
menu = basic.BasicProperty(
"menu", """The sub-menu we are presenting to the user""",
)
+
def __call__(self, pressed, parent):
"""Get result from the sub-menu, add ourselves into the result"""
def onResult(result):
log.debug("""Child menu %s result: %s""", self.menu, result)
- result.insert(0, (self,pressed))
+ result.insert(0, (self, pressed))
return result
def onFailure(reason):
@@ -460,6 +476,7 @@
class ExitOn(Option):
"""An option which exits from the current menu level"""
+
def __call__(self, pressed, parent):
"""Raise a MenuExit error"""
raise error.MenuExit(
@@ -477,18 +494,21 @@
)
readBack = common.BooleanProperty(
"readBack", """Whether to read the entered value back to the user""",
- defaultValue = False,
+ defaultValue=False,
)
minDigits = common.IntegerProperty(
- "minDigits", """Minimum number of digits to collect (only restricted if specified)""",
+ "minDigits",
+ """Minimum number of digits to collect (only restricted if specified)""",
)
maxDigits = common.IntegerProperty(
- "maxDigits", """Maximum number of digits to collect (only restricted if specified)""",
+ "maxDigits",
+ """Maximum number of digits to collect (only restricted if specified)""",
)
runnerClass = CollectDigitsRunner
tellInvalid = common.IntegerProperty(
- "tellInvalid", """Whether to tell the user that their selection is unrecognised""",
- defaultValue = True,
+ "tellInvalid",
+ """Whether to tell the user that their selection is unrecognised""",
+ defaultValue=True,
)
@@ -497,11 +517,11 @@
runnerClass = CollectPasswordRunner
escapeDigits = common.StringLocaleProperty(
"escapeDigits", """Set of digits which escape from password entry""",
- defaultValue = '',
+ defaultValue='',
)
soundFile = common.StringLocaleProperty(
"soundFile", """File (name) for the pre-recorded blurb""",
- defaultValue = 'vm-password',
+ defaultValue='vm-password',
)
@@ -517,30 +537,31 @@
"textPrompt", """Textual prompt describing the option""",
)
temporaryFile = common.StringLocaleProperty(
- "temporaryFile", """Temporary file into which to record the audio before moving to filename""",
+ "temporaryFile",
+ """Temporary file into which to record the audio before moving to filename""",
)
filename = common.StringLocaleProperty(
"filename", """Final filename into which to record the file...""",
)
deleteOnFail = common.BooleanProperty(
"deleteOnFail", """Whether to delete failed attempts to record a file""",
- defaultValue = True
+ defaultValue=True
)
escapeDigits = common.StringLocaleProperty(
"escapeDigits", """Set of digits which escape from recording the file""",
- defaultValue = '#*0123456789',
+ defaultValue='#*0123456789',
)
timeout = common.FloatProperty(
"timeout", """Duration to wait for recording (maximum record time)""",
- defaultValue = 60,
+ defaultValue=60,
)
silence = common.FloatProperty(
"silence", """Duration to wait for recording (maximum record time)""",
- defaultValue = 5,
+ defaultValue=5,
)
beep = common.BooleanProperty(
"beep", """Whether to play a "beep" sound at beginning of recording""",
- defaultValue = True,
+ defaultValue=True,
)
runnerClass = CollectAudioRunner
@@ -560,6 +581,7 @@
timeout = common.FloatProperty(
"timeout", """Timeout on data-entry after completed reading""",
)
+
def __call__(self):
"""Return a deferred that chains all of the sub-prompts in order
@@ -575,7 +597,7 @@
return result
try:
element = self.elements[index]
- except IndexError, err:
+ except IndexError as err:
# okay, do a waitForDigit from timeout seconds...
return self.agi.waitForDigit(self.timeout).addCallback(
self.processKey
@@ -592,7 +614,7 @@
# getOption result...
if result[1] == 0:
# failure during load of the file...
- log.warn("""Apparent failure during load of audio file: %s""", self.value)
+ log.warn("Apparent failure during load of audio file: %s", self.value)
result = 0
else:
result = result[0]
@@ -601,7 +623,7 @@
result = ord(result)
else:
result = 0
- if result: # None or 0
+ if result: # None or 0
# User pressed a key during the reading...
key = chr(result)
if key in self.escapeDigits:
@@ -613,7 +635,7 @@
# completed reading without any escape digits, continue reading
return None
- def processLast(self,result):
+ def processLast(self, result):
if result is None:
result = ''
return result
@@ -624,6 +646,7 @@
value = basic.BasicProperty(
"value", """Filename to be read to the user""",
)
+
def __init__(self, value, **named):
named['value'] = value
super(Prompt, self).__init__(**named)
@@ -648,6 +671,7 @@
value = common.IntegerProperty(
"value", """Integer numeral to read""",
)
+
def read(self, agi, escapeDigits):
"""Read the audio prompt to the user"""
return agi.sayNumber(self.value, escapeDigits)
@@ -671,8 +695,9 @@
"""Prompt that reads a date/time as a date"""
format = basic.BasicProperty(
"format", """Format in which to read the date to the user""",
- defaultValue = None
+ defaultValue=None
)
+
def read(self, agi, escapeDigits):
"""Read the audio prompt to the user"""
return agi.sayDateTime(self.value, escapeDigits, format=self.format)
diff --git a/examples/menutest.py b/examples/menutest.py
index 55d6726..e7a1ab0 100644
--- a/examples/menutest.py
+++ b/examples/menutest.py
@@ -1,81 +1,80 @@
#! /usr/bin/env python
"""Sample application to test the menuing utility classes"""
-from twisted.application import service, internet
-from twisted.internet import reactor, defer
-from starpy import manager, fastagi, error
+from twisted.internet import reactor
+from starpy import fastagi
import utilapplication
import menu
-import os, logging, pprint, time
+import logging
-log = logging.getLogger( 'menutest' )
+log = logging.getLogger('menutest')
mainMenu = menu.Menu(
- prompt = '/home/mcfletch/starpydemo/soundfiles/menutest-toplevel',
- #prompt = 'houston',
- textPrompt = '''Top level of the menu test example
-
- Pressing Star will exit this menu at any time.
- Options zero and pound will exit with those options selected.
- Option one will start a submenu.
- Option two will start a digit-collecting sub-menu.
- We'll tell you if you make an invalid selection here.''',
- options = [
- menu.Option( option='0' ),
- menu.Option( option='#' ),
- menu.ExitOn( option='*' ),
- menu.SubMenu(
- option='1',
- menu = menu.Menu(
- prompt = '/home/mcfletch/starpydemo/soundfiles/menutest-secondlevel',
- #prompt = 'atlantic',
- textPrompt = '''A second-level menu in the menu test example
-
- Pressing Star will exit this menu at any time.
- Options zero and pound will exit the whole menu with those options selected.
- We won't tell you if you make an invalid selection here.
- ''',
- tellInvalid = False, # don't report incorrect selections
- options = [
- menu.Option( option='0' ),
- menu.Option( option='#' ),
- menu.ExitOn( option='*' ),
- ],
- ),
- ),
- menu.SubMenu(
- option='2',
- menu = menu.CollectDigits(
- textPrompt = '''Digit collection example,
- Please enter three to 5 digits.
- ''',
- soundFile = '/home/mcfletch/starpydemo/soundfiles/menutest-digits',
- #soundFile = 'extension',
- maxDigits = 5,
- minDigits = 3,
- ),
- ),
- ],
+ prompt='/home/mcfletch/starpydemo/soundfiles/menutest-toplevel',
+ textPrompt='''Top level of the menu test example
+
+ Pressing Star will exit this menu at any time.
+ Options zero and pound will exit with those options selected.
+ Option one will start a submenu.
+ Option two will start a digit-collecting sub-menu.
+ We'll tell you if you make an invalid selection here.''',
+ options=[
+ menu.Option(option='0'),
+ menu.Option(option='#'),
+ menu.ExitOn(option='*'),
+ menu.SubMenu(
+ option='1',
+ menu=menu.Menu(
+ prompt='/home/mcfletch/starpydemo/soundfiles/menutest-secondlevel',
+ textPrompt='''A second-level menu in the menu test example
+
+ Pressing Star will exit this menu at any time. Options zero
+ and pound will exit the whole menu with those options selected.
+ We won't tell you if you make an invalid selection here.
+ ''',
+ tellInvalid=False, # don't report incorrect selections
+ options=[
+ menu.Option(option='0'),
+ menu.Option(option='#'),
+ menu.ExitOn(option='*'),
+ ],
+ ),
+ ),
+ menu.SubMenu(
+ option='2',
+ menu=menu.CollectDigits(
+ textPrompt='''Digit collection example,
+ Please enter three to 5 digits.
+ ''',
+ soundFile='/home/mcfletch/starpydemo/soundfiles/menutest-digits',
+ maxDigits=5,
+ minDigits=3,
+ ),
+ ),
+ ],
)
-class Application( utilapplication.UtilApplication ):
- """Application for the call duration callback mechanism"""
- def onS( self, agi ):
- """Incoming AGI connection to the "s" extension (start operation)"""
- log.info( """New call tracker""" )
- def onComplete( result ):
- log.info( """Final result: %r""", result )
- agi.finish()
- return mainMenu( agi ).addCallbacks( onComplete, onComplete )
+
+class Application(utilapplication.UtilApplication):
+ """Application for the call duration callback mechanism"""
+
+ def onS(self, agi):
+ """Incoming AGI connection to the "s" extension (start operation)"""
+ log.info("""New call tracker""")
+
+ def onComplete(result):
+ log.info("""Final result: %r""", result)
+ agi.finish()
+ return mainMenu(agi).addCallbacks(onComplete, onComplete)
+
APPLICATION = Application()
if __name__ == "__main__":
- logging.basicConfig()
- log.setLevel( logging.DEBUG )
- #manager.log.setLevel( logging.DEBUG )
- fastagi.log.setLevel( logging.DEBUG )
- menu.log.setLevel( logging.DEBUG )
- APPLICATION.handleCallsFor( 's', APPLICATION.onS )
- APPLICATION.agiSpecifier.run( APPLICATION.dispatchIncomingCall )
- from twisted.internet import reactor
- reactor.run()
+ logging.basicConfig()
+ log.setLevel(logging.DEBUG)
+ # manager.log.setLevel(logging.DEBUG)
+ fastagi.log.setLevel(logging.DEBUG)
+ menu.log.setLevel(logging.DEBUG)
+ APPLICATION.handleCallsFor('s', APPLICATION.onS)
+ APPLICATION.agiSpecifier.run(APPLICATION.dispatchIncomingCall)
+ reactor.run()
diff --git a/examples/priexhaustion.py b/examples/priexhaustion.py
index 6f70cae..167f0cd 100644
--- a/examples/priexhaustion.py
+++ b/examples/priexhaustion.py
@@ -2,99 +2,104 @@
"""Sample application to watch for PRI exhaustion
This script watches for events on the AMI interface, tracking the identity of
-open channels in order to track how many channels are being used. This would
-be used to send messages to an administrator when network capacity is being
+open channels in order to track how many channels are being used. This would
+be used to send messages to an administrator when network capacity is being
approached.
-Similarly, you could watch for spare capacity on the network and use that
+Similarly, you could watch for spare capacity on the network and use that
to decide whether to allow low-priority calls, such as peering framework or
free-world-dialup calls to go through.
"""
-from twisted.application import service, internet
-from twisted.internet import reactor, defer
-from starpy import manager, fastagi
+from twisted.internet import reactor
import utilapplication
-import menu
-import os, logging, pprint, time
-from basicproperty import common, propertied, basic
+import logging
+from basicproperty import common, propertied
-log = logging.getLogger( 'priexhaustion' )
-log.setLevel( logging.INFO )
+log = logging.getLogger('priexhaustion')
+log.setLevel(logging.INFO)
-class ChannelTracker( propertied.Propertied ):
- """Track open channels on the Asterisk server"""
- channels = common.DictionaryProperty(
- "channels", """Set of open channels on the system""",
- )
- thresholdCount = common.IntegerProperty(
- "thresholdCount", """Storage of threshold below which we don't warn user""",
- defaultValue = 20,
- )
- def main( self ):
- """Main operation for the channel-tracking demo"""
- amiDF = APPLICATION.amiSpecifier.login(
- ).addCallback( self.onAMIConnect )
- # XXX do something useful on failure to login...
- def onAMIConnect( self, ami ):
- """Register for AMI events"""
- # XXX should do an initial query to populate channels...
- # XXX should handle asterisk reboots (at the moment the AMI
- # interface will just stop generating events), not a practical
- # problem at the moment, but should have a periodic check to be sure
- # the interface is still up, and if not, should close and restart
- log.debug( 'onAMIConnect' )
- ami.status().addCallback( self.onStatus, ami=ami )
- ami.registerEvent( 'Hangup', self.onChannelHangup )
- ami.registerEvent( 'Newchannel', self.onChannelNew )
- def interestingEvent( self, event, ami=None ):
- """Decide whether this channel event is interesting
-
- Real-world application would want to take only Zap channels, or only
- channels from a given context, or whatever other filter you want in
- order to capture *just* the scarce resource (such as PRI lines).
-
- Keep in mind that an "interesting" event must show up as interesting
- for *both* Newchannel and Hangup events or you will leak
- references/channels or have unknown channels hanging up.
- """
- return True
- def onStatus( self, events, ami=None ):
- """Integrate the current status into our set of channels"""
- log.debug( """Initial channel status retrieved""" )
- for event in events:
- self.onChannelNew( ami, event )
- def onChannelNew( self, ami, event ):
- """Handle creation of a new channel"""
- log.debug( """Start on channel %s""", event )
- if self.interestingEvent( event, ami ):
- opening = not self.channels.has_key( event['uniqueid'] )
- self.channels[ event['uniqueid'] ] = event
- if opening:
- self.onChannelChange( ami, event, opening = opening )
- def onChannelHangup( self, ami, event ):
- """Handle hangup of an existing channel"""
- if self.interestingEvent( event, ami ):
- try:
- del self.channels[ event['uniqueid']]
- except KeyError, err:
- log.warn( """Hangup on unknown channel %s""", event )
- else:
- log.debug( """Hangup on channel %s""", event )
- self.onChannelChange( ami, event, opening = False )
- def onChannelChange( self, ami, event, opening=False ):
- """Channel count has changed, do something useful like enforcing limits"""
- if opening and len(self.channels) > self.thresholdCount:
- log.warn( """Current channel count: %s""", len(self.channels ) )
- else:
- log.info( """Current channel count: %s""", len(self.channels ) )
+
+class ChannelTracker(propertied.Propertied):
+ """Track open channels on the Asterisk server"""
+ channels = common.DictionaryProperty(
+ "channels", """Set of open channels on the system""",
+ )
+ thresholdCount = common.IntegerProperty(
+ "thresholdCount", """Storage of threshold below which we don't warn user""",
+ defaultValue=20,
+ )
+
+ def main(self):
+ """Main operation for the channel-tracking demo"""
+ APPLICATION.amiSpecifier.login().addCallback(self.onAMIConnect)
+ # XXX do something useful on failure to login...
+
+ def onAMIConnect(self, ami):
+ """Register for AMI events"""
+ # XXX should do an initial query to populate channels...
+ # XXX should handle asterisk reboots (at the moment the AMI
+ # interface will just stop generating events), not a practical
+ # problem at the moment, but should have a periodic check to be sure
+ # the interface is still up, and if not, should close and restart
+ log.debug('onAMIConnect')
+ ami.status().addCallback(self.onStatus, ami=ami)
+ ami.registerEvent('Hangup', self.onChannelHangup)
+ ami.registerEvent('Newchannel', self.onChannelNew)
+
+ def interestingEvent(self, event, ami=None):
+ """Decide whether this channel event is interesting
+
+ Real-world application would want to take only Zap channels, or only
+ channels from a given context, or whatever other filter you want in
+ order to capture *just* the scarce resource (such as PRI lines).
+
+ Keep in mind that an "interesting" event must show up as interesting
+ for *both* Newchannel and Hangup events or you will leak
+ references/channels or have unknown channels hanging up.
+ """
+ return True
+
+ def onStatus(self, events, ami=None):
+ """Integrate the current status into our set of channels"""
+ log.debug("""Initial channel status retrieved""")
+ for event in events:
+ self.onChannelNew(ami, event)
+
+ def onChannelNew(self, ami, event):
+ """Handle creation of a new channel"""
+ log.debug("""Start on channel %s""", event)
+ if self.interestingEvent(event, ami):
+ opening = event['uniqueid'] not in self.channels
+ self.channels[event['uniqueid']] = event
+ if opening:
+ self.onChannelChange(ami, event, opening=opening)
+
+ def onChannelHangup(self, ami, event):
+ """Handle hangup of an existing channel"""
+ if self.interestingEvent(event, ami):
+ try:
+ del self.channels[event['uniqueid']]
+ except KeyError as err:
+ log.warn("""Hangup on unknown channel %s""", event)
+ else:
+ log.debug("""Hangup on channel %s""", event)
+ self.onChannelChange(ami, event, opening=False)
+
+ def onChannelChange(self, ami, event, opening=False):
+ """Channel count has changed, do something useful like enforcing limits"""
+ if opening and len(self.channels) > self.thresholdCount:
+ log.warn("""Current channel count: %s""", len(self.channels))
+ else:
+ log.info("""Current channel count: %s""", len(self.channels))
+
APPLICATION = utilapplication.UtilApplication()
if __name__ == "__main__":
- logging.basicConfig()
- #log.setLevel( logging.DEBUG )
- #manager.log.setLevel( logging.DEBUG )
- #fastagi.log.setLevel( logging.DEBUG )
- tracker = ChannelTracker()
- reactor.callWhenRunning( tracker.main )
- reactor.run()
+ logging.basicConfig()
+ # log.setLevel(logging.DEBUG)
+ # manager.log.setLevel(logging.DEBUG)
+ # fastagi.log.setLevel(logging.DEBUG)
+ tracker = ChannelTracker()
+ reactor.callWhenRunning(tracker.main)
+ reactor.run()
diff --git a/examples/priexhaustionbare.py b/examples/priexhaustionbare.py
index 4e0290b..450bf02 100644
--- a/examples/priexhaustionbare.py
+++ b/examples/priexhaustionbare.py
@@ -1,64 +1,69 @@
#! /usr/bin/env python
-from twisted.application import service, internet
-from twisted.internet import reactor, defer
-from starpy import manager, fastagi
+from twisted.internet import reactor
import utilapplication
-import menu
-import os, logging, pprint, time
-from basicproperty import common, propertied, basic
+import logging
+from basicproperty import common, propertied
-log = logging.getLogger( 'priexhaustion' )
-log.setLevel( logging.INFO )
+log = logging.getLogger('priexhaustion')
+log.setLevel(logging.INFO)
-class ChannelTracker( propertied.Propertied ):
- """Track open channels on the Asterisk server"""
- channels = common.DictionaryProperty(
- "channels", """Set of open channels on the system""",
- )
- thresholdCount = common.IntegerProperty(
- "thresholdCount", """Storage of threshold below which we don't warn user""",
- defaultValue = 20,
- )
- def main( self ):
- """Main operation for the channel-tracking demo"""
- amiDF = APPLICATION.amiSpecifier.login(
- ).addCallback( self.onAMIConnect )
- def onAMIConnect( self, ami ):
- ami.status().addCallback( self.onStatus, ami=ami )
- ami.registerEvent( 'Hangup', self.onChannelHangup )
- ami.registerEvent( 'Newchannel', self.onChannelNew )
- def onStatus( self, events, ami=None ):
- """Integrate the current status into our set of channels"""
- log.debug( """Initial channel status retrieved""" )
- for event in events:
- self.onChannelNew( ami, event )
- def onChannelNew( self, ami, event ):
- """Handle creation of a new channel"""
- log.debug( """Start on channel %s""", event )
- opening = not self.channels.has_key( event['uniqueid'] )
- self.channels[ event['uniqueid'] ] = event
- if opening:
- self.onChannelChange( ami, event, opening = opening )
- def onChannelHangup( self, ami, event ):
- """Handle hangup of an existing channel"""
- try:
- del self.channels[ event['uniqueid']]
- except KeyError, err:
- log.warn( """Hangup on unknown channel %s""", event )
- else:
- log.debug( """Hangup on channel %s""", event )
- self.onChannelChange( ami, event, opening = False )
- def onChannelChange( self, ami, event, opening=False ):
- """Channel count has changed, do something useful like enforcing limits"""
- if opening and len(self.channels) > self.thresholdCount:
- log.warn( """Current channel count: %s""", len(self.channels ) )
- else:
- log.info( """Current channel count: %s""", len(self.channels ) )
+
+class ChannelTracker(propertied.Propertied):
+ """Track open channels on the Asterisk server"""
+ channels = common.DictionaryProperty(
+ "channels", """Set of open channels on the system""",
+ )
+ thresholdCount = common.IntegerProperty(
+ "thresholdCount", """Storage of threshold below which we don't warn user""",
+ defaultValue=20,
+ )
+
+ def main(self):
+ """Main operation for the channel-tracking demo"""
+ APPLICATION.amiSpecifier.login().addCallback(self.onAMIConnect)
+
+ def onAMIConnect(self, ami):
+ ami.status().addCallback(self.onStatus, ami=ami)
+ ami.registerEvent('Hangup', self.onChannelHangup)
+ ami.registerEvent('Newchannel', self.onChannelNew)
+
+ def onStatus(self, events, ami=None):
+ """Integrate the current status into our set of channels"""
+ log.debug("""Initial channel status retrieved""")
+ for event in events:
+ self.onChannelNew(ami, event)
+
+ def onChannelNew(self, ami, event):
+ """Handle creation of a new channel"""
+ log.debug("""Start on channel %s""", event)
+ opening = event['uniqueid'] not in self.channels
+ self.channels[event['uniqueid']] = event
+ if opening:
+ self.onChannelChange(ami, event, opening=opening)
+
+ def onChannelHangup(self, ami, event):
+ """Handle hangup of an existing channel"""
+ try:
+ del self.channels[event['uniqueid']]
+ except KeyError as err:
+ log.warn("""Hangup on unknown channel %s""", event)
+ else:
+ log.debug("""Hangup on channel %s""", event)
+ self.onChannelChange(ami, event, opening=False)
+
+ def onChannelChange(self, ami, event, opening=False):
+ """Channel count has changed, do something useful like enforcing limits"""
+ if opening and len(self.channels) > self.thresholdCount:
+ log.warn("""Current channel count: %s""", len(self.channels))
+ else:
+ log.info("""Current channel count: %s""", len(self.channels))
+
APPLICATION = utilapplication.UtilApplication()
+
if __name__ == "__main__":
- logging.basicConfig()
- tracker = ChannelTracker()
- reactor.callWhenRunning( tracker.main )
- reactor.run()
+ logging.basicConfig()
+ tracker = ChannelTracker()
+ reactor.callWhenRunning(tracker.main)
+ reactor.run()
diff --git a/examples/readingdigits.py b/examples/readingdigits.py
index 093f1be..5f2895b 100644
--- a/examples/readingdigits.py
+++ b/examples/readingdigits.py
@@ -1,60 +1,70 @@
#! /usr/bin/env python
"""Read digits from the user in various ways..."""
-from twisted.internet import reactor, defer
-from starpy import fastagi, error
-import logging, time
+from twisted.internet import reactor
+from starpy import fastagi
+import logging
-log = logging.getLogger( 'hellofastagi' )
+log = logging.getLogger('hellofastagi')
-class DialPlan( object ):
- """Stupid little application to report how many times it's been accessed"""
- def __init__( self ):
- self.count = 0
- def __call__( self, agi ):
- """Store the AGI instance for later usage, kick off our operations"""
- self.agi = agi
- return self.start()
- def start( self ):
- """Begin the dial-plan-like operations"""
- return self.agi.answer().addCallbacks( self.onAnswered, self.answerFailure )
- def answerFailure( self, reason ):
- """Deal with a failure to answer"""
- log.warn(
- """Unable to answer channel %r: %s""",
- self.agi.variables['agi_channel'], reason.getTraceback(),
- )
- self.agi.finish()
- def onAnswered( self, resultLine ):
- """We've managed to answer the channel, yay!"""
- self.count += 1
- return self.agi.wait( 2.0 ).addCallback( self.onWaited )
- def onWaited( self, result ):
- """We've finished waiting, tell the user the number"""
- return self.agi.sayNumber( self.count, '*' ).addErrback(
- self.onNumberFailed,
- ).addCallbacks(
- self.onFinished, self.onFinished,
- )
- def onFinished( self, resultLine ):
- """We said the number correctly, hang up on the user"""
- return self.agi.finish()
- def onNumberFailed( self, reason ):
- """We were unable to read the number to the user"""
- log.warn(
- """Unable to read number to user on channel %r: %s""",
- self.agi.variables['agi_channel'], reason.getTraceback(),
- )
-
- def onHangupFailure( self, reason ):
- """Failed trying to hang up"""
- log.warn(
- """Unable to hang up channel %r: %s""",
- self.agi.variables['agi_channel'], reason.getTraceback(),
- )
+
+class DialPlan(object):
+ """Stupid little application to report how many times it's been accessed"""
+
+ def __init__(self):
+ self.count = 0
+
+ def __call__(self, agi):
+ """Store the AGI instance for later usage, kick off our operations"""
+ self.agi = agi
+ return self.start()
+
+ def start(self):
+ """Begin the dial-plan-like operations"""
+ return self.agi.answer().addCallbacks(self.onAnswered, self.answerFailure)
+
+ def answerFailure(self, reason):
+ """Deal with a failure to answer"""
+ log.warn(
+ """Unable to answer channel %r: %s""",
+ self.agi.variables['agi_channel'], reason.getTraceback(),
+ )
+ self.agi.finish()
+
+ def onAnswered(self, resultLine):
+ """We've managed to answer the channel, yay!"""
+ self.count += 1
+ return self.agi.wait(2.0).addCallback(self.onWaited)
+
+ def onWaited(self, result):
+ """We've finished waiting, tell the user the number"""
+ return self.agi.sayNumber(self.count, '*').addErrback(
+ self.onNumberFailed,
+ ).addCallbacks(
+ self.onFinished, self.onFinished,
+ )
+
+ def onFinished(self, resultLine):
+ """We said the number correctly, hang up on the user"""
+ return self.agi.finish()
+
+ def onNumberFailed(self, reason):
+ """We were unable to read the number to the user"""
+ log.warn(
+ """Unable to read number to user on channel %r: %s""",
+ self.agi.variables['agi_channel'], reason.getTraceback(),
+ )
+
+ def onHangupFailure(self, reason):
+ """Failed trying to hang up"""
+ log.warn(
+ """Unable to hang up channel %r: %s""",
+ self.agi.variables['agi_channel'], reason.getTraceback(),
+ )
+
if __name__ == "__main__":
- logging.basicConfig()
- fastagi.log.setLevel( logging.DEBUG )
- f = fastagi.FastAGIFactory(DialPlan())
- reactor.listenTCP(4573, f, 50, '127.0.0.1') # only binding on local interface
- reactor.run()
+ logging.basicConfig()
+ fastagi.log.setLevel(logging.DEBUG)
+ f = fastagi.FastAGIFactory(DialPlan())
+ reactor.listenTCP(4573, f, 50, '127.0.0.1') # only binding on local interface
+ reactor.run()
diff --git a/examples/timestamp.py b/examples/timestamp.py
index e66e832..1793d6b 100644
--- a/examples/timestamp.py
+++ b/examples/timestamp.py
@@ -2,44 +2,52 @@
"""Provide a trivial date-and-time service"""
from twisted.internet import reactor
from starpy import fastagi
-import logging, time
+import logging
+import time
-log = logging.getLogger( 'dateandtime' )
+log = logging.getLogger('dateandtime')
-def testFunction( agi ):
- """Give time for some time a bit in the future"""
- log.debug( 'testFunction' )
- df = agi.streamFile( 'at-tone-time-exactly' )
- def onFailed( reason ):
- log.error( "Failure: %s", reason.getTraceback())
- return None
- def cleanup( result ):
- agi.finish()
- return result
- def onSaid( resultLine ):
- """Having introduced, actually read the time"""
- t = time.time()
- t2 = t+20.0
- df = agi.sayDateTime( t2, format='HM' )
- def onDateFinished( resultLine ):
- # now need to sleep until .5 seconds before the time
- df = agi.wait( t2-.5-time.time() )
- def onDoBeep( result ):
- df = agi.streamFile( 'beep' )
- return df
- return df.addCallback( onDoBeep )
- return df.addCallback( onDateFinished )
- return df.addCallback(
- onSaid
- ).addErrback(
- onFailed
- ).addCallbacks(
- cleanup, cleanup,
- )
+
+def testFunction(agi):
+ """Give time for some time a bit in the future"""
+ log.debug('testFunction')
+ df = agi.streamFile('at-tone-time-exactly')
+
+ def onFailed(reason):
+ log.error("Failure: %s", reason.getTraceback())
+ return None
+
+ def cleanup(result):
+ agi.finish()
+ return result
+
+ def onSaid(resultLine):
+ """Having introduced, actually read the time"""
+ t = time.time()
+ t2 = t+20.0
+ df = agi.sayDateTime(t2, format='HM')
+
+ def onDateFinished(resultLine):
+ # now need to sleep until .5 seconds before the time
+ df = agi.wait(t2-.5-time.time())
+
+ def onDoBeep(result):
+ df = agi.streamFile('beep')
+ return df
+ return df.addCallback(onDoBeep)
+ return df.addCallback(onDateFinished)
+ return df.addCallback(
+ onSaid
+ ).addErrback(
+ onFailed
+ ).addCallbacks(
+ cleanup, cleanup,
+ )
+
if __name__ == "__main__":
- logging.basicConfig()
- fastagi.log.setLevel( logging.INFO )
- f = fastagi.FastAGIFactory(testFunction)
- reactor.listenTCP(4574, f, 50, '127.0.0.1') # only binding on local interface
- reactor.run()
+ logging.basicConfig()
+ fastagi.log.setLevel(logging.INFO)
+ f = fastagi.FastAGIFactory(testFunction)
+ reactor.listenTCP(4574, f, 50, '127.0.0.1') # only binding on local interface
+ reactor.run()
diff --git a/examples/timestampapp.py b/examples/timestampapp.py
index 0585155..dba94b6 100644
--- a/examples/timestampapp.py
+++ b/examples/timestampapp.py
@@ -3,46 +3,55 @@
from twisted.internet import reactor
from starpy import fastagi
import utilapplication
-import logging, time
+import logging
+import time
-log = logging.getLogger( 'dateandtime' )
+log = logging.getLogger('dateandtime')
-def testFunction( agi ):
- """Give time for some time a bit in the future"""
- log.debug( 'testFunction' )
- df = agi.streamFile( 'at-tone-time-exactly' )
- def onFailed( reason ):
- log.error( "Failure: %s", reason.getTraceback())
- return None
- def cleanup( result ):
- agi.finish()
- return result
- def onSaid( resultLine ):
- """Having introduced, actually read the time"""
- t = time.time()
- t2 = t+7.0
- df = agi.sayDateTime( t2, format='HMS' )
- def onDateFinished( resultLine ):
- # now need to sleep until .05 seconds before the time
- df = agi.wait( t2-.05-time.time() )
- def onDoBeep( result ):
- df = agi.streamFile( 'beep' )
- return df
- def waitTwo( result ):
- return agi.streamFile( 'thank-you-for-calling' )
- return df.addCallback( onDoBeep ).addCallback( waitTwo )
- return df.addCallback( onDateFinished )
- return df.addCallback(
- onSaid
- ).addErrback(
- onFailed
- ).addCallbacks(
- cleanup, cleanup,
- )
+
+def testFunction(agi):
+ """Give time for some time a bit in the future"""
+ log.debug('testFunction')
+ df = agi.streamFile('at-tone-time-exactly')
+
+ def onFailed(reason):
+ log.error("Failure: %s", reason.getTraceback())
+ return None
+
+ def cleanup(result):
+ agi.finish()
+ return result
+
+ def onSaid(resultLine):
+ """Having introduced, actually read the time"""
+ t = time.time()
+ t2 = t+7.0
+ df = agi.sayDateTime(t2, format='HMS')
+
+ def onDateFinished(resultLine):
+ # now need to sleep until .05 seconds before the time
+ df = agi.wait(t2-.05-time.time())
+
+ def onDoBeep(result):
+ df = agi.streamFile('beep')
+ return df
+
+ def waitTwo(result):
+ return agi.streamFile('thank-you-for-calling')
+ return df.addCallback(onDoBeep).addCallback(waitTwo)
+ return df.addCallback(onDateFinished)
+ return df.addCallback(
+ onSaid
+ ).addErrback(
+ onFailed
+ ).addCallbacks(
+ cleanup, cleanup,
+ )
+
if __name__ == "__main__":
- logging.basicConfig()
- fastagi.log.setLevel( logging.INFO )
- APPLICATION = utilapplication.UtilApplication()
- reactor.callWhenRunning( APPLICATION.agiSpecifier.run, testFunction )
- reactor.run()
+ logging.basicConfig()
+ fastagi.log.setLevel(logging.INFO)
+ APPLICATION = utilapplication.UtilApplication()
+ reactor.callWhenRunning(APPLICATION.agiSpecifier.run, testFunction)
+ reactor.run()
diff --git a/examples/utilapplication.py b/examples/utilapplication.py
index 0e51901..e7fc6ed 100644
--- a/examples/utilapplication.py
+++ b/examples/utilapplication.py
@@ -1,6 +1,6 @@
#
# StarPy -- Asterisk Protocols for Twisted
-#
+#
# Copyright (c) 2006, Michael C. Fletcher
#
# Michael C. Fletcher <mcfletch at vrplumber.com>
@@ -15,15 +15,17 @@
# details.
"""Class providing utility applications with common support code"""
-from basicproperty import common, propertied, basic, weak
+from basicproperty import common, propertied, basic
from ConfigParser import ConfigParser
from starpy import fastagi, manager
from twisted.internet import defer, reactor
-import logging,os
+import logging
+import os
-log = logging.getLogger( 'app' )
+log = logging.getLogger('app')
-class UtilApplication( propertied.Propertied ):
+
+class UtilApplication(propertied.Propertied):
"""Utility class providing simple application-level operations
FastAGI entry points are waitForCallOn and handleCallsFor, which allow
@@ -32,82 +34,93 @@
(as specified in self.configFiles).
"""
amiSpecifier = basic.BasicProperty(
- "amiSpecifier", """AMI connection specifier for the application see AMISpecifier""",
- defaultFunction = lambda prop,client: AMISpecifier()
+ "amiSpecifier",
+ """AMI connection specifier for the application see AMISpecifier""",
+ defaultFunction=lambda prop, client: AMISpecifier()
)
agiSpecifier = basic.BasicProperty(
- "agiSpecifier", """FastAGI server specifier for the application see AGISpecifier""",
- defaultFunction = lambda prop,client: AGISpecifier()
+ "agiSpecifier",
+ """FastAGI server specifier for the application see AGISpecifier""",
+ defaultFunction=lambda prop, client: AGISpecifier()
)
extensionWaiters = common.DictionaryProperty(
"extensionWaiters", """Set of deferreds waiting for incoming extensions""",
)
extensionHandlers = common.DictionaryProperty(
- "extensionHandlers", """Set of permanant callbacks waiting for incoming extensions""",
+ "extensionHandlers",
+ """Set of permanant callbacks waiting for incoming extensions""",
)
- configFiles = configFiles=('starpy.conf','~/.starpy.conf')
- def __init__( self ):
+ configFiles = ('starpy.conf', '~/.starpy.conf')
+
+ def __init__(self):
"""Initialise the application from options in configFile"""
self.loadConfigurations()
- def loadConfigurations( self ):
- parser = self._loadConfigFiles( self.configFiles )
- self._copyPropertiesFrom( parser, 'AMI', self.amiSpecifier )
- self._copyPropertiesFrom( parser, 'FastAGI', self.agiSpecifier )
+
+ def loadConfigurations(self):
+ parser = self._loadConfigFiles(self.configFiles)
+ self._copyPropertiesFrom(parser, 'AMI', self.amiSpecifier)
+ self._copyPropertiesFrom(parser, 'FastAGI', self.agiSpecifier)
return parser
- def _loadConfigFiles( self, configFiles ):
+
+ def _loadConfigFiles(self, configFiles):
"""Load options from configuration files given (if present)"""
- parser = ConfigParser( )
+ parser = ConfigParser()
filenames = [
- os.path.abspath( os.path.expandvars( os.path.expanduser( file ) ))
+ os.path.abspath(os.path.expandvars(os.path.expanduser(file)))
for file in configFiles
]
- log.info( "Possible configuration files:\n\t%s", "\n\t".join(filenames) or None)
+ log.info("Possible configuration files:\n\t%s", "\n\t".join(filenames) or None)
filenames = [
file for file in filenames
if os.path.isfile(file)
]
- log.info( "Actual configuration files:\n\t%s", "\n\t".join(filenames) or None)
- parser.read( filenames )
+ log.info("Actual configuration files:\n\t%s", "\n\t".join(filenames) or None)
+ parser.read(filenames)
return parser
- def _copyPropertiesFrom( self, parser, section, client, properties=None ):
+
+ def _copyPropertiesFrom(self, parser, section, client, properties=None):
"""Copy properties from the config-parser's given section into client"""
if properties is None:
properties = client.getProperties()
for property in properties:
- if parser.has_option( section, property.name ):
+ if parser.has_option(section, property.name):
try:
- value = parser.get( section, property.name )
- setattr( client, property.name, value )
- except (TypeError,ValueError,AttributeError,NameError), err:
- log( """Unable to set property %r of %r to config-file value %r: %s"""%(
- property.name, client, parser.get( section, property.name, 1), err,
+ value = parser.get(section, property.name)
+ setattr(client, property.name, value)
+ except (TypeError, ValueError, AttributeError, NameError) as err:
+ log('Unable to set property %r of %r to config-file value %r: %s' % (
+ property.name, client, parser.get(section, property.name, 1), err,
))
return client
- def dispatchIncomingCall( self, agi ):
+
+ def dispatchIncomingCall(self, agi):
"""Handle an incoming call (dispatch to the appropriate registered handler)"""
extension = agi.variables['agi_extension']
- log.info( """AGI connection with extension: %r""", extension )
+ log.info("""AGI connection with extension: %r""", extension)
try:
- df = self.extensionWaiters.pop( extension )
- except KeyError, err:
+ df = self.extensionWaiters.pop(extension)
+ except KeyError as err:
try:
- callback = self.extensionHandlers[ extension ]
- except KeyError, err:
+ callback = self.extensionHandlers[extension]
+ except KeyError as err:
try:
- callback = self.extensionHandlers[ None ]
- except KeyError, err:
- log.warn( """Unexpected connection to extension %r: %s""", extension, agi.variables )
+ callback = self.extensionHandlers[None]
+ except KeyError as err:
+ log.warn("""Unexpected connection to extension %r: %s""",
+ extension, agi.variables)
agi.finish()
return
try:
- return callback( agi )
- except Exception, err:
- log.error( """Failure during callback %s for agi %s: %s""", callback, agi.variables, err )
+ return callback(agi)
+ except Exception as err:
+ log.error("""Failure during callback %s for agi %s: %s""",
+ callback, agi.variables, err)
# XXX return a -1 here
else:
if not df.called:
- df.callback( agi )
- def waitForCallOn( self, extension, timeout=15 ):
+ df.callback(agi)
+
+ def waitForCallOn(self, extension, timeout=15):
"""Wait for an AGI call on extension given
extension -- string extension for which to wait
@@ -122,17 +135,19 @@
returns deferred returning connected FastAGIProtocol or an error
"""
extension = str(extension)
- log.info( 'Waiting for extension %r for %s seconds', extension, timeout )
- df = defer.Deferred( )
- self.extensionWaiters[ extension ] = df
- def onTimeout( ):
+ log.info('Waiting for extension %r for %s seconds', extension, timeout)
+ df = defer.Deferred()
+ self.extensionWaiters[extension] = df
+
+ def onTimeout():
if not df.called:
- df.errback( defer.TimeoutError(
- """Timeout waiting for call on extension: %r"""%(extension,)
+ df.errback(defer.TimeoutError(
+ """Timeout waiting for call on extension: %r""" % (extension,)
))
- reactor.callLater( timeout, onTimeout )
+ reactor.callLater(timeout, onTimeout)
return df
- def handleCallsFor( self, extension, callback ):
+
+ def handleCallsFor(self, extension, callback):
"""Register permanant handler for given extension
extension -- string extension for which to wait or None to define
@@ -150,9 +165,10 @@
"""
if extension is not None:
extension = str(extension)
- self.extensionHandlers[ extension ] = callback
+ self.extensionHandlers[extension] = callback
-class AMISpecifier( propertied.Propertied ):
+
+class AMISpecifier(propertied.Propertied):
"""Manager interface setup/specifier"""
username = common.StringLocaleProperty(
"username", """Login username for the manager interface""",
@@ -163,36 +179,39 @@
password = secret
server = common.StringLocaleProperty(
"server", """Server IP address to which to connect""",
- defaultValue = '127.0.0.1',
+ defaultValue='127.0.0.1',
)
port = common.IntegerProperty(
"port", """Server IP port to which to connect""",
- defaultValue = 5038,
+ defaultValue=5038,
)
timeout = common.FloatProperty(
"timeout", """Timeout in seconds for an AMI connection timeout""",
- defaultValue = 5.0,
+ defaultValue=5.0,
)
- def login( self ):
+
+ def login(self):
"""Login to the specified manager via the AMI"""
theManager = manager.AMIFactory(self.username, self.secret)
return theManager.login(self.server, self.port, timeout=self.timeout)
-class AGISpecifier( propertied.Propertied ):
+
+class AGISpecifier(propertied.Propertied):
"""Specifier of where we send the user to connect to our AGI"""
port = common.IntegerProperty(
"port", """IP port on which to listen""",
- defaultValue = 4573,
+ defaultValue=4573,
)
interface = common.StringLocaleProperty(
"interface", """IP interface on which to listen (local only by default)""",
- defaultValue = '127.0.0.1',
+ defaultValue='127.0.0.1',
)
context = common.StringLocaleProperty(
"context", """Asterisk context to which to connect incoming calls""",
- defaultValue = 'survey',
+ defaultValue='survey',
)
- def run( self, mainFunction ):
+
+ def run(self, mainFunction):
"""Start up the AGI server with the given mainFunction"""
f = fastagi.FastAGIFactory(mainFunction)
return reactor.listenTCP(self.port, f, 50, self.interface)
diff --git a/starpy/fastagi.py b/starpy/fastagi.py
index 7e26284..85de054 100644
--- a/starpy/fastagi.py
+++ b/starpy/fastagi.py
@@ -28,7 +28,6 @@
from twisted.internet import protocol, reactor, defer
from twisted.internet import error as tw_error
from twisted.protocols import basic
-import socket
import logging
import time
from starpy import error
@@ -236,9 +235,10 @@
if endpos == skipMS:
# "likely" an error according to the wiki,
# we'll raise an error...
- raise error.AGICommandFailure(FAILURE_CODE,
- "End position %s == original position, "
- "result code %s" % (endpos, digit))
+ raise error.AGICommandFailure(
+ FAILURE_CODE,
+ "End position %s == original position, "
+ "result code %s" % (endpos, digit))
return digit, endpos
raise ValueError("Unexpected result on streaming completion: %r" %
resultLine)
@@ -267,8 +267,8 @@
sequence = InSequence()
sequence.append(self.setContext, self.variables['agi_context'])
sequence.append(self.setExtension, self.variables['agi_extension'])
- sequence.append(self.setPriority, int(self.variables['agi_priority'])
- + difference)
+ sequence.append(self.setPriority,
+ int(self.variables['agi_priority']) + difference)
sequence.append(self.finish)
return sequence()
@@ -326,7 +326,7 @@
"""
parts = resultLine.split(' ', 1)
result = int(parts[0])
- endpos = None # Default if endpos isn't specified
+ endpos = None # Default if endpos isn't specified
if len(parts) == 2:
endposStuff = parts[1].strip()
if endposStuff.startswith('endpos='):
@@ -379,7 +379,7 @@
"""
command = 'DATABASE DELTREE "%s"' % (family,)
if keyTree:
- command += ' "%s"' % (keytree,)
+ command += ' "%s"' % (keyTree,)
return self.sendCommand(command).addCallback(
self.checkFailure, failure='0',
).addCallback(self.resultAsInt)
@@ -629,8 +629,7 @@
def recordFile(
self, filename, format, escapeDigits, timeout=-1,
- offsetSamples=None, beep=True, silence=None,
- ):
+ offsetSamples=None, beep=True, silence=None,):
"""Record channel to given filename until escapeDigits or silence
filename -- filename on the server to which to save
diff --git a/starpy/manager.py b/starpy/manager.py
index 6876f74..7adac62 100644
--- a/starpy/manager.py
+++ b/starpy/manager.py
@@ -46,11 +46,13 @@
"""A subclass of defer.Deferred that adds a registerError method
to handle function callback when an Error response happens"""
_errorRespCallback = None
- def registerError(self, function ):
+
+ def registerError(self, function):
"""Add function for Error response callback"""
self._errorRespCallback = function
log.debug('Registering function %s to handle Error response'
% (function))
+
class AMIProtocol(basic.LineOnlyReceiver):
"""Protocol for the interfacing with the Asterisk Manager Interface (AMI)
@@ -239,10 +241,10 @@
if line:
if line.endswith(self.END_DATA):
# multi-line command results...
- message.setdefault(' ', []).extend([
- l for l in line.split('\n')
- if (l and l != self.END_DATA)
- ])
+ message.setdefault(' ', []).extend(
+ [l for l in line.split('\n')
+ if (l and l != self.END_DATA)]
+ )
else:
# regular line...
if line.startswith(self.VERSION_PREFIX):
@@ -319,8 +321,9 @@
def checkErrorResponse(self, result, actionid, df):
"""Check for error response and callback"""
- self.cleanup( result, actionid)
- if isinstance(result, dict) and result.get('response') == 'Error' and df._errorRespCallback:
+ self.cleanup(result, actionid)
+ if isinstance(result, dict) and result.get('response') == 'Error' \
+ and df._errorRespCallback:
df._errorRespCallback(result)
return result
@@ -398,7 +401,7 @@
raise error.AMICommandFailure(message)
return message
- ## End-user API
+ # End-user API
def absoluteTimeout(self, channel, timeout):
"""Set timeout value for the given channel (in seconds)"""
message = {
@@ -461,10 +464,9 @@
def action(self, action, **action_args):
"""Sends an arbitrary action to the AMI"""
- #action_args will be ar least an empty dict so we build the message from it.
+ # action_args will be at least an empty dict so we build the message from it.
action_args['action'] = action
return self.sendDeferred(action_args).addCallback(self.errorUnlessResponse)
-
def dbDel(self, family, key):
"""Delete key value in the AstDB database"""
@@ -493,7 +495,8 @@
value = event['val']
self.deregisterEvent("DBGetResponse", extractValue)
return df.callback(value)
- def errorResponse( message ):
+
+ def errorResponse(message):
self.deregisterEvent("DBGetResponse", extractValue)
return df.callback(None)
message = {
@@ -612,14 +615,16 @@
def loginChallengeResponse(self):
"""Log into the AMI interface with challenge-response.
- Follows the same approach as self.login() using factory.username and factory.secret.
- Also done automatically on connection: will be called instead of self.login() if
- factory.plaintext_login is False: see AMIFactory constructor.
+ Follows the same approach as self.login() using factory.username and
+ factory.secret. Also done automatically on connection: will be called
+ instead of self.login() if factory.plaintext_login is False: see
+ AMIFactory constructor.
"""
def sendResponse(challenge):
- if not type(challenge) is dict or not 'challenge' in challenge:
+ if not type(challenge) is dict or 'challenge' not in challenge:
raise error.AMICommandFailure(challenge)
- key_value = md5('%s%s' % (challenge['challenge'], self.factory.secret)).hexdigest()
+ key_value = md5('%s%s' % (challenge['challenge'], self.factory.secret)) \
+ .hexdigest()
return self.sendDeferred({
'action': 'Login',
'authtype': 'MD5',
@@ -712,8 +717,7 @@
self, channel, context=None, exten=None, priority=None,
timeout=None, callerid=None, account=None, application=None,
data=None, variable={}, async=False, channelid=None,
- otherchannelid=None
- ):
+ otherchannelid=None):
"""Originate call to connect channel to given context/exten/priority
channel -- the outgoing channel to which will be dialed
@@ -886,7 +890,7 @@
message = {
'action': 'queues'
}
- #return self.collectDeferred(message, 'QueueStatusEnd')
+ # return self.collectDeferred(message, 'QueueStatusEnd')
return self.sendDeferred(message).addCallback(self.errorUnlessResponse)
def queueStatus(self, queue=None, member=None):
@@ -1101,7 +1105,8 @@
"""
protocol = AMIProtocol
- def __init__(self, username, secret, id=None, plaintext_login=True, on_reconnect=None):
+ def __init__(self, username, secret, id=None, plaintext_login=True,
+ on_reconnect=None):
self.username = username
self.secret = secret
self.id = id
diff --git a/tox.ini b/tox.ini
index a6c1ede..dd04c12 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,9 @@
[tox]
envlist = py26,py27,py32,py33,py34,p35,pep8
+[pep8]
+max-line-length=90
+
[testenv]
deps = -r{toxinidir}/tools/pip-requires
-r{toxinidir}/tools/test-requires
--
To view, visit https://gerrit.asterisk.org/8990
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-Project: starpy
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I293635427a48e66298042c3fc1e600e47854d3ed
Gerrit-Change-Number: 8990
Gerrit-PatchSet: 1
Gerrit-Owner: Corey Farrell <git at cfware.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180514/b23ce0e8/attachment-0001.html>
More information about the asterisk-code-review
mailing list