[asterisk-commits] jpeeler: branch jpeeler/event_watcher r268 - /asterisk/team/jpeeler/event_wat...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Apr 29 17:46:03 CDT 2010
Author: jpeeler
Date: Thu Apr 29 17:45:59 2010
New Revision: 268
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=268
Log:
attempt to use a common naming convention
Modified:
asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
asterisk/team/jpeeler/event_watcher/tests/ami-monitor/run-test
Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py?view=diff&rev=268&r1=267&r2=268
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py Thu Apr 29 17:45:59 2010
@@ -27,9 +27,9 @@
def clear_vars(self):
- self.eventList = list()
+ self.event_list = list()
self.count = 0
- self.sendEventList = list()
+ self.send_event_list = list()
self.ordered = False
def __init__(self):
@@ -38,10 +38,10 @@
self.ami = None
self.testcount = 0
self.passed = True
- self.callID = None
+ self.call_id = None
self.timeout_sec = 5
- self.reactorLock = threading.Lock()
+ self.reactor_lock = threading.Lock()
self.reactor_stopped = False
self.clear_vars()
@@ -49,17 +49,13 @@
if STANDALONE:
return
- self.log.info("Creating Asterisk instance ...")
- self.asterisk = Asterisk(base=os.path.join(os.getcwd(),
- "tests/ami-monitor/tmp"))
+ self.log.info("Creating Asterisk instance...")
+ self.asterisk = Asterisk(base="/tmp/asterisk-testsuite/ami-monitor")
self.asterisk.install_config("tests/ami-monitor/configs/manager.conf")
self.asterisk.install_config("tests/ami-monitor/configs/logger.conf")
self.start_asterisk()
- def next_test(self, cb):
- self.next_test = cb
-
def set_timeout(self, seconds):
self.timeout_sec = seconds
@@ -70,14 +66,14 @@
self.ordered = ordered
def add_event(self, event):
- self.eventList.append(event)
+ self.event_list.append(event)
self.count = self.count + 1
def add_send_event(self, event):
- self.sendEventList.append(event)
+ self.send_event_list.append(event)
def set_events(self, ordered, list):
- self.eventList = list
+ self.event_list = list
self.count = len(list)
self.ordered = ordered
@@ -86,17 +82,17 @@
return
self.log.debug("Showing events")
- for n in self.eventList:
+ for n in self.event_list:
self.log.debug("Event: %s" % n)
def get_events(self):
- return self.eventList
+ return self.event_list
def check_events(self):
- if not self.eventList:
+ if not self.event_list:
return False
- for events in self.eventList:
+ for events in self.event_list:
matched = False
for event in events:
if "match" in event:
@@ -109,22 +105,22 @@
def timeout(self):
self.log.debug("Timed out")
self.passed = False
- self.reactorLock.acquire()
+ self.reactor_lock.acquire()
if not self.reactor_stopped:
reactor.stop()
self.reactor_stopped = True
- self.reactorLock.release()
- self.stop_asterisk()
+ self.stop_asterisk()
+ self.reactor_lock.release()
def end_test(self):
- self.reactorLock.acquire()
+ self.reactor_lock.acquire()
if not self.reactor_stopped:
- self.callID.cancel()
+ self.call_id.cancel()
reactor.stop()
self.reactor_stopped = True
- self.reactorLock.release()
+ self.stop_asterisk()
+ self.reactor_lock.release()
self.log.info("DONE - end_test")
- self.stop_asterisk()
def dict_in_dict(self, d_set, d_subset):
return len(d_subset) == len(set(d_subset.items()) & set(d_set.items()))
@@ -132,58 +128,57 @@
def start(self):
utilapplication.UtilApplication.configFiles = (os.getcwd() + '/tests/ami-monitor/starpy.conf', 'starpy.conf')
# Log into AMI
- amiDF = utilapplication.UtilApplication().amiSpecifier.login().addCallbacks(self.onAMIConnect, self.onAMIFailure)
-
- def sendEvents(self, ami):
- if self.sendEventList:
- for amessage in self.sendEventList:
- id = ami.sendMessage(amessage, self.onSentResponse)
- self.log.debug("SENDING %s with id %s" % (amessage, id))
-
- def onAMIFailure(self, ami):
+ amiDF = utilapplication.UtilApplication().amiSpecifier.login().addCallbacks(self.on_connect, self.on_failure)
+
+ def send_events(self, ami):
+ if self.send_event_list:
+ for amessage in self.send_event_list:
+ id = ami.sendMessage(amessage, self.on_sent_response)
+ self.log.debug("Sending %s with id %s" % (amessage, id))
+
+ def on_failure(self, ami):
self.log.critical("Stopping asterisk, login failure")
self.passed = False
self.stop_asterisk()
- def onAMIConnect(self, ami):
+ def on_connect(self, ami):
"""Register for AMI events"""
# XXX should handle asterisk reboots (at the moment the AMI
# interface will just stop generating events), not a practical
# problem at the moment, but should have a periodic check to be sure
# the interface is still up, and if not, should close and restart
- self.log.debug('onAMIConnect')
- ami.status().addCallback(self.onStatus, ami=ami)
-
- if len(self.eventList) or len(self.sendEventList) != 0:
+ ami.status().addCallback(self.on_status, ami=ami)
+
+ if len(self.event_list) or len(self.send_event_list) != 0:
self.log.warn("Don't load events outside of a test method!")
- self.loadNextTest(ami, True)
-
- if len(self.eventList) == 0:
+ self.load_next_test(ami, True)
+
+ if len(self.event_list) == 0:
self.log.error("No events to monitor!")
- ami.registerEvent(None, self.onAnyEvent)
+ ami.registerEvent(None, self.on_any_event)
self.ami = ami
- self.sendEvents(ami)
-
- def onSentResponse(self, result):
+ self.send_events(ami)
+
+ def on_sent_response(self, result):
# don't care about connection terminated event
if result.__str__() != "Connection was closed cleanly: FastAGI connection terminated.":
self.log.debug("Result from sent event: %s", result)
- self.onAnyEvent(self.ami, result)
+ self.on_any_event(self.ami, result)
else:
self.log.debug("Ignoring connection close event")
def connectionLost(reason):
self.log.critical("Connection lost: %s", reason)
- def onAnyEvent(self, ami, event):
-
- self.log.debug("Running onAnyEvent")
+ def on_any_event(self, ami, event):
+
+ self.log.debug("Running on_any_event")
if not self.ordered:
- for next_events in self.eventList:
+ for next_events in self.event_list:
for next_event in next_events:
self.log.debug("Looking at %s vs %s" % (next_event, event))
if self.dict_in_dict(event, next_event):
@@ -191,41 +186,41 @@
self.log.debug("New event %s" % next_event)
self.count = self.count - 1
if self.count == 0:
- self.loadNextTest(ami)
+ self.load_next_test(ami)
else:
- index = abs(self.count - len(self.eventList))
- for next_event in self.eventList[index]:
+ index = abs(self.count - len(self.event_list))
+ for next_event in self.event_list[index]:
if self.dict_in_dict(event, next_event):
next_event.update({"match": time.time()})
self.log.debug("New event %s" % next_event)
self.count = self.count - 1
if self.count == 0:
- self.loadNextTest(ami)
+ self.load_next_test(ami)
continue
- def next_test(self, count, toexec):
+ def exec_next_test(self, count, toexec):
self.log.info("Next test count %s", count)
if not hasattr(self.testobj, toexec):
self.log.debug("No more tests")
return -1
- self.log.debug("Next test (%s) exists", toexec)
- if self.callID:
- self.callID.cancel()
+ self.log.debug("Test method %s exists", toexec)
+ if self.call_id:
+ self.call_id.cancel()
method = getattr(self.testobj, toexec)
self.log.debug("Begin executing %s", toexec)
method(self)
self.log.debug("Finish executing %s", toexec)
- if len(self.eventList) > 0:
+ if len(self.event_list) > 0:
self.log.debug("Rescheduling timeout")
- self.callID = reactor.callLater(self.timeout_sec, self.timeout)
+ self.call_id = reactor.callLater(self.timeout_sec, self.timeout)
return 0
self.log.warn("Returning, no events added by test method!")
return -1 # exception or something
- def loadNextTest(self, ami, firstrun=None):
+ def load_next_test(self, ami, firstrun=None):
if not firstrun:
self.log.debug("About to shut down all monitoring")
ami.deregisterEvent(None, None)
@@ -242,17 +237,17 @@
self.clear_vars()
toexec = "test" + str(self.testcount)
- res = self.next_test(self.testcount, toexec)
+ res = self.exec_next_test(self.testcount, toexec)
self.log.debug("res %s from exec %s" % (res, toexec))
if res == -1:
self.ami = None
self.end_test()
return
self.testcount = self.testcount + 1
- ami.registerEvent(None, self.onAnyEvent)
- self.sendEvents(ami)
-
- def onStatus(self, events, ami=None):
+ ami.registerEvent(None, self.on_any_event)
+ self.send_events(ami)
+
+ def on_status(self, events, ami=None):
self.log.debug("Initial channel status retrieved")
if events:
self.log.critical("Test expects no channels to have started yet, aborting!")
@@ -262,51 +257,7 @@
for event in events:
self.log.debug("Received event: %s", event)
-def main():
- logging.basicConfig()
-
- manager.log.setLevel(logging.DEBUG)
-
- watcher = EventWatcher()
- watcher.log.setLevel(logging.DEBUG)
-
- #scenario 1 with optional event matching
- #event1 = [{'event' : 'Alarm', 'channel' : '18' }]
- #event2 = [{'event' : 'Alarm', 'channel' : '17' }]
- #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
- #self.add_event(event1)
- #self.add_event(event2)
- #self.add_event(event3)
- #self.set_ordered(False)
-
- #scenario 2 with optional event matching
- event1 = [{'event' : 'Alarm', 'channel' : '18' }]
- event2 = [{'event' : 'Alarm', 'channel' : '17' }]
- event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
- watcher.add_event(event1)
- watcher.add_event(event2)
- watcher.add_event(event3)
- watcher.set_ordered(True)
-
- # alternative event set up:
- #self.set_events(False,
- # [[{'event' : 'Alarm', 'channel' : '18' }],
- # [{'event' : 'Alarm', 'channel' : '17' }],
- # [{'event' : 'Alarm', 'channel' : '19' }, {'event' : 'Alarm', 'channel' : '20'}]])
-
- reactor.callWhenRunning(watcher.start)
- reactor.callLater(15, watcher.timeout)
- reactor.run()
-
- watcher.show_events()
-
- if watcher.check_events():
- print "DEBUG: all good!"
- return 0
- print "DEBUG: FAIL"
- return 1
-
if __name__ == "__main__":
- sys.exit(main() or 0)
+ print "This code is meant to be imported"
# vim:sw=4:ts=4:expandtab:textwidth=79
Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/run-test
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/run-test?view=diff&rev=268&r1=267&r2=268
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/run-test (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/run-test Thu Apr 29 17:45:59 2010
@@ -38,6 +38,12 @@
#watcher.add_event(event1)
#watcher.add_event(event2)
#watcher.add_event(event3)
+
+ # alternative event set up:
+ #watcher.set_events(False,
+ # [[{'event' : 'Alarm', 'channel' : '18' }],
+ # [{'event' : 'Alarm', 'channel' : '17' }],
+ # [{'event' : 'Alarm', 'channel' : '19' }, {'event' : 'Alarm', 'channel' : '20'}]])
#watcher.set_ordered(True)
def main():
More information about the asterisk-commits
mailing list