[asterisk-commits] jpeeler: branch jpeeler/event_watcher r254 - /asterisk/team/jpeeler/event_wat...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Apr 27 16:07:53 CDT 2010


Author: jpeeler
Date: Tue Apr 27 16:07:50 2010
New Revision: 254

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=254
Log:
classify test module so that reactor timeout callback can easily be rescheduled for each individual test rather than be for all of them

Modified:
    asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
    asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py

Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py?view=diff&rev=254&r1=253&r2=254
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py Tue Apr 27 16:07:50 2010
@@ -72,7 +72,7 @@
         #log.critical("JPEELER: Checking dictionaries")
         return len(d_subset) == len(set(d_subset.items()) & set(d_set.items()))
 
-    def main( self ):
+    def main(self):
         # Log into AMI
         amiDF = APPLICATION.amiSpecifier.login().addCallbacks(self.onAMIConnect, self.onAMIFailure)
 

Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py?view=diff&rev=254&r1=253&r2=254
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py Tue Apr 27 16:07:50 2010
@@ -10,88 +10,95 @@
 from basicproperty import common, propertied, basic
 import subprocess
 
-def test0(watcher):
-    print "WOOT! - test0"
-    event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
-    watcher.add_event(event1)
-    event_send = {'Action' : 'ping'}
-    watcher.add_send_event(event_send)
-    print "ENDWOOT"
+class Test():
 
-def test1(watcher):
-    print "WOOT2! - test1"
-    sleep(3) # DELETE THIS
-    event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
-    watcher.add_event(event1)
-    event_send = {'Action' : 'ping'}
-    watcher.add_send_event(event_send)
-    print "ENDWOOT2"
+    def __init__(self):
+        callID = None
 
-def next_test(watcher, count, passed, toexec):
-    print "NEXT TEST YAYA ", count, passed
-    if not passed:
+    def test0(self, watcher):
+        print "WOOT! - test0"
+        event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+        watcher.add_event(event1)
+        event_send = {'Action' : 'ping'}
+        watcher.add_send_event(event_send)
+        print "ENDWOOT"
+    
+    def test1(self, watcher):
+        print "WOOT2! - test1"
+        event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+        watcher.add_event(event1)
+        event_send = {'Action' : 'ping'}
+        watcher.add_send_event(event_send)
+        print "ENDWOOT2"
+    
+    def next_test(self, watcher, count, passed, toexec):
+        print "NEXT TEST YAYA ", count, passed
+        if not passed:
+            return 1
+    
+        if not hasattr(self, toexec):
+             print "DEBUG: no more tests"
+             return -1
+
+        print "DEBUG: next test exists"
+        if self.callID:
+            self.callID.cancel()
+            self.callID = reactor.callLater(5, watcher.timeout)
+        method = getattr(self, toexec)
+        method(watcher)
+        if len(watcher.eventList) > 0:
+             return 0
+    
+        print "DEBUG: returning, no events..."
+        return -1 # exception or something
+    
+    def main(self):
+        logging.basicConfig()
+    
+        manager.log.setLevel(logging.DEBUG)
+        watcher = client.EventWatcher()
+        watcher.log.setLevel(logging.DEBUG)
+    
+        #scenario 2 with optional event matching
+        #event1 = [{'event' : 'Alarm', 'channel' : '18' }]
+        #event2 = [{'event' : 'Alarm', 'channel' : '17' }]
+        #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
+        #watcher.add_event(event1)
+        #watcher.add_event(event2)
+        #watcher.add_event(event3)
+        #watcher.set_ordered(True)
+    
+        #event1 = [{'event' : 'Alarm', 'channel' : '17' }]
+        #event2 = [{'event' : 'Alarm', 'channel' : '18' }]
+        #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
+        #watcher.add_event(event1)
+        #watcher.add_event(event2)
+        #watcher.add_event(event3)
+        #watcher.set_ordered(True)
+    
+        watcher.next_test(self.next_test)
+    
+        event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+        watcher.add_event(event1)
+        event_send = {'Action' : 'ping'}
+        watcher.add_send_event(event_send)
+    
+        reactor.callWhenRunning(watcher.main)
+        self.callID = reactor.callLater(5, watcher.timeout)
+        reactor.run()
+            
+        watcher.show_events()
+    
+        if watcher.check_events():
+            print "DEBUG: all good!"
+            return 0
+        print "DEBUG: FAIL"
         return 1
 
-    #TODO: use hasattr
-    print "about to see...", toexec
-    try:
-        obj = eval(toexec)
-    except NameError:
-        print "DEBUG: no more tests"
-        return -1
-    print "obj found ", obj
-    if callable(obj):
-        print "DEBUG: determined next test exists"
-        eval(toexec)(watcher)
-        if len(watcher.eventList) > 0:
-            return 0
-
-    print "DEBUG: returning, no events..."
-    return -1 # exception or something
-
 def main():
-    logging.basicConfig()
-
-    manager.log.setLevel(logging.DEBUG)
-    watcher = client.EventWatcher()
-    watcher.log.setLevel(logging.DEBUG)
-
-    #scenario 2 with optional event matching
-    #event1 = [{'event' : 'Alarm', 'channel' : '18' }]
-    #event2 = [{'event' : 'Alarm', 'channel' : '17' }]
-    #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
-    #watcher.add_event(event1)
-    #watcher.add_event(event2)
-    #watcher.add_event(event3)
-    #watcher.set_ordered(True)
-
-    #event1 = [{'event' : 'Alarm', 'channel' : '17' }]
-    #event2 = [{'event' : 'Alarm', 'channel' : '18' }]
-    #event3 = [{'event' : 'Alarm', 'channel' : '20' }, {'event' : 'Alarm', 'channel' : '19'}]
-    #watcher.add_event(event1)
-    #watcher.add_event(event2)
-    #watcher.add_event(event3)
-    #watcher.set_ordered(True)
-
-    watcher.next_test(next_test)
-
-    event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
-    watcher.add_event(event1)
-    event_send = {'Action' : 'ping'}
-    watcher.add_send_event(event_send)
-
-    reactor.callWhenRunning(watcher.main)
-    callID = reactor.callLater(1, watcher.timeout)
-    reactor.run()
-        
-    watcher.show_events()
-
-    if watcher.check_events():
-        print "DEBUG: all good!"
-        return 0
-    print "DEBUG: FAIL"
-    return 1
-
+    mytest = Test()
+    mytest.main()
+    
 if __name__ == "__main__":
     sys.exit(main() or 0)
 




More information about the asterisk-commits mailing list