[asterisk-commits] jpeeler: branch jpeeler/event_watcher r254 - /asterisk/team/jpeeler/event_wat...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Apr 27 16:07:53 CDT 2010
Author: jpeeler
Date: Tue Apr 27 16:07:50 2010
New Revision: 254
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=254
Log:
classify test module so that reactor timeout callback can easily be rescheduled for each individual test rather than be for all of them
Modified:
asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py
Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py?view=diff&rev=254&r1=253&r2=254
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py Tue Apr 27 16:07:50 2010
@@ -72,7 +72,7 @@
#log.critical("JPEELER: Checking dictionaries")
return len(d_subset) == len(set(d_subset.items()) & set(d_set.items()))
- def main( self ):
+ def main(self):
# Log into AMI
amiDF = APPLICATION.amiSpecifier.login().addCallbacks(self.onAMIConnect, self.onAMIFailure)
Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py?view=diff&rev=254&r1=253&r2=254
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py Tue Apr 27 16:07:50 2010
@@ -10,88 +10,95 @@
from basicproperty import common, propertied, basic
import subprocess
-def test0(watcher):
- print "WOOT! - test0"
- event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
- watcher.add_event(event1)
- event_send = {'Action' : 'ping'}
- watcher.add_send_event(event_send)
- print "ENDWOOT"
+class Test():
-def test1(watcher):
- print "WOOT2! - test1"
- sleep(3) # DELETE THIS
- event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
- watcher.add_event(event1)
- event_send = {'Action' : 'ping'}
- watcher.add_send_event(event_send)
- print "ENDWOOT2"
+ def __init__(self):
+ callID = None
-def next_test(watcher, count, passed, toexec):
- print "NEXT TEST YAYA ", count, passed
- if not passed:
+ def test0(self, watcher):
+ print "WOOT! - test0"
+ event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+ watcher.add_event(event1)
+ event_send = {'Action' : 'ping'}
+ watcher.add_send_event(event_send)
+ print "ENDWOOT"
+
+ def test1(self, watcher):
+ print "WOOT2! - test1"
+ event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+ watcher.add_event(event1)
+ event_send = {'Action' : 'ping'}
+ watcher.add_send_event(event_send)
+ print "ENDWOOT2"
+
+ def next_test(self, watcher, count, passed, toexec):
+ print "NEXT TEST YAYA ", count, passed
+ if not passed:
+ return 1
+
+ if not hasattr(self, toexec):
+ print "DEBUG: no more tests"
+ return -1
+
+ print "DEBUG: next test exists"
+ if self.callID:
+ self.callID.cancel()
+ self.callID = reactor.callLater(5, watcher.timeout)
+ method = getattr(self, toexec)
+ method(watcher)
+ if len(watcher.eventList) > 0:
+ return 0
+
+ print "DEBUG: returning, no events..."
+ return -1 # exception or something
+
+ def main(self):
+ logging.basicConfig()
+
+ manager.log.setLevel(logging.DEBUG)
+ watcher = client.EventWatcher()
+ watcher.log.setLevel(logging.DEBUG)
+
+ #scenario 2 with optional event matching
+ #event1 = [{'event' : 'Alarm', 'channel' : '18' }]
+ #event2 = [{'event' : 'Alarm', 'channel' : '17' }]
+ #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
+ #watcher.add_event(event1)
+ #watcher.add_event(event2)
+ #watcher.add_event(event3)
+ #watcher.set_ordered(True)
+
+ #event1 = [{'event' : 'Alarm', 'channel' : '17' }]
+ #event2 = [{'event' : 'Alarm', 'channel' : '18' }]
+ #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
+ #watcher.add_event(event1)
+ #watcher.add_event(event2)
+ #watcher.add_event(event3)
+ #watcher.set_ordered(True)
+
+ watcher.next_test(self.next_test)
+
+ event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+ watcher.add_event(event1)
+ event_send = {'Action' : 'ping'}
+ watcher.add_send_event(event_send)
+
+ reactor.callWhenRunning(watcher.main)
+ self.callID = reactor.callLater(5, watcher.timeout)
+ reactor.run()
+
+ watcher.show_events()
+
+ if watcher.check_events():
+ print "DEBUG: all good!"
+ return 0
+ print "DEBUG: FAIL"
return 1
- #TODO: use hasattr
- print "about to see...", toexec
- try:
- obj = eval(toexec)
- except NameError:
- print "DEBUG: no more tests"
- return -1
- print "obj found ", obj
- if callable(obj):
- print "DEBUG: determined next test exists"
- eval(toexec)(watcher)
- if len(watcher.eventList) > 0:
- return 0
-
- print "DEBUG: returning, no events..."
- return -1 # exception or something
-
def main():
- logging.basicConfig()
-
- manager.log.setLevel(logging.DEBUG)
- watcher = client.EventWatcher()
- watcher.log.setLevel(logging.DEBUG)
-
- #scenario 2 with optional event matching
- #event1 = [{'event' : 'Alarm', 'channel' : '18' }]
- #event2 = [{'event' : 'Alarm', 'channel' : '17' }]
- #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
- #watcher.add_event(event1)
- #watcher.add_event(event2)
- #watcher.add_event(event3)
- #watcher.set_ordered(True)
-
- #event1 = [{'event' : 'Alarm', 'channel' : '17' }]
- #event2 = [{'event' : 'Alarm', 'channel' : '18' }]
- #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
- #watcher.add_event(event1)
- #watcher.add_event(event2)
- #watcher.add_event(event3)
- #watcher.set_ordered(True)
-
- watcher.next_test(next_test)
-
- event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
- watcher.add_event(event1)
- event_send = {'Action' : 'ping'}
- watcher.add_send_event(event_send)
-
- reactor.callWhenRunning(watcher.main)
- callID = reactor.callLater(1, watcher.timeout)
- reactor.run()
-
- watcher.show_events()
-
- if watcher.check_events():
- print "DEBUG: all good!"
- return 0
- print "DEBUG: FAIL"
- return 1
-
+ mytest = Test()
+ mytest.main()
+
if __name__ == "__main__":
sys.exit(main() or 0)
More information about the asterisk-commits
mailing list