[asterisk-commits] jpeeler: branch jpeeler/event_watcher r253 - /asterisk/team/jpeeler/event_wat...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Apr 27 15:00:39 CDT 2010


Author: jpeeler
Date: Tue Apr 27 15:00:35 2010
New Revision: 253

URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=253
Log:
commit this before converting (probably) test module into a class

Modified:
    asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
    asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py

Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py?view=diff&rev=253&r1=252&r2=253
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py Tue Apr 27 15:00:35 2010
@@ -8,14 +8,27 @@
 
 class EventWatcher(propertied.Propertied):
 
-    def __init__(self):
+    def retard(self):
+        print "DEBUG: I don't know python, can I call this?"
+
+    def clear_vars(self):
+ 
         self.eventList = list()
         self.count = 0
+        self.sendEventList = list()
         self.ordered = False
+
+    def __init__(self):
         self.log = logging.getLogger('TestAMI')
         self.log.setLevel(logging.INFO)
-        self.sendEventList = list()
         self.ami = None
+        self.testcount = 0
+        #self.next_test = None
+
+        self.clear_vars()
+
+    def next_test(self, cb):
+        self.next_test = cb
 
     def set_ordered(self, ordered):
         self.ordered = ordered
@@ -52,6 +65,7 @@
         return True
 
     def timeout(self):
+        print "DEBUG: timed out"
         reactor.stop()
 
     def dict_in_dict(self, d_set, d_subset):
@@ -63,6 +77,12 @@
         amiDF = APPLICATION.amiSpecifier.login().addCallbacks(self.onAMIConnect, self.onAMIFailure)
 
         self.show_events()
+
+    def sendEvents(self, ami):
+        if self.sendEventList:
+            for amessage in self.sendEventList:
+                id = ami.sendMessage(amessage, self.onSentResponse)
+                self.log.debug("!!!!!!!!!!!!!!!! SENDING %s with id %s" % (amessage, id))
 
     def onAMIFailure(self, ami):
         self.log.critical("Stop asterisk")
@@ -82,16 +102,15 @@
         ami.registerEvent(None, self.onAnyEvent)
 
         self.ami = ami
-        if self.sendEventList:
-            for amessage in self.sendEventList:
-                id = ami.sendMessage(amessage, self.onSentResponse)
-                self.log.debug("SENDING %s with id %s" % (amessage, id))
+        self.sendEvents(ami)
 
     def onSentResponse(self, result):
         print "result ---------------------------->", result
         # don't care about connection terminated event
-        if self.ami:
+        if result.__str__() != "Connection was closed cleanly: FastAGI connection terminated.":
         	self.onAnyEvent(self.ami, result)
+        else:
+            print "DEBUG: aborted because connection close event"
 
     def connectionLost(reason):
         self.log.critical("UNPLANNED? connection lost")
@@ -124,13 +143,43 @@
                         self.stopMonitoring(ami)
                     continue
 
+    def end_test(self):
+        if reactor.running:
+            reactor.stop()
+        self.log.critical("DEBUG: DONE, end_test")
+
     def stopMonitoring(self, ami):
         self.log.debug("About to shut down all monitoring")
-        self.ami = None
+
+        #self.ami = None
+        #ami.deregisterEvent(None, None)
+        #self.end_test()
+
         ami.deregisterEvent(None, None)
-        if reactor.running:
-            reactor.stop()
-        self.log.critical("DONE")
+
+        passed = self.check_events()
+        if not passed:
+            print "TEST FAILED"
+            self.end_test()
+            return
+        else:
+            print "DEBUG: TEST GOOD"
+
+        self.clear_vars()
+        toexec = "test" + str(self.testcount)
+        res = self.next_test(self, self.testcount, passed, toexec)
+        print "DEBUG: res %s from exec %s" % (res, toexec)
+        if res == -1:
+            self.ami = None
+            ami.deregisterEvent(None, None)
+            self.end_test()
+            return
+        self.testcount = self.testcount + 1
+        ami.registerEvent(None, self.onAnyEvent)
+        self.sendEvents(ami)
+
+
+        # TODO: need to have reactor call later rescheduling
 
     def onStatus(self, events, ami=None):
         self.log.debug("Initial channel status retrieved")

Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py?view=diff&rev=253&r1=252&r2=253
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py Tue Apr 27 15:00:35 2010
@@ -10,6 +10,44 @@
 from basicproperty import common, propertied, basic
 import subprocess
 
+def test0(watcher):
+    print "WOOT! - test0"
+    event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+    watcher.add_event(event1)
+    event_send = {'Action' : 'ping'}
+    watcher.add_send_event(event_send)
+    print "ENDWOOT"
+
+def test1(watcher):
+    print "WOOT2! - test1"
+    sleep(3) # DELETE THIS
+    event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+    watcher.add_event(event1)
+    event_send = {'Action' : 'ping'}
+    watcher.add_send_event(event_send)
+    print "ENDWOOT2"
+
+def next_test(watcher, count, passed, toexec):
+    print "NEXT TEST YAYA ", count, passed
+    if not passed:
+        return 1
+
+    #TODO: use hasattr
+    print "about to see...", toexec
+    try:
+        obj = eval(toexec)
+    except NameError:
+        print "DEBUG: no more tests"
+        return -1
+    print "obj found ", obj
+    if callable(obj):
+        print "DEBUG: determined next test exists"
+        eval(toexec)(watcher)
+        if len(watcher.eventList) > 0:
+            return 0
+
+    print "DEBUG: returning, no events..."
+    return -1 # exception or something
 
 def main():
     logging.basicConfig()
@@ -35,14 +73,15 @@
     #watcher.add_event(event3)
     #watcher.set_ordered(True)
 
+    watcher.next_test(next_test)
+
     event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
     watcher.add_event(event1)
-
     event_send = {'Action' : 'ping'}
     watcher.add_send_event(event_send)
 
     reactor.callWhenRunning(watcher.main)
-    reactor.callLater(5, watcher.timeout)
+    callID = reactor.callLater(1, watcher.timeout)
     reactor.run()
         
     watcher.show_events()




More information about the asterisk-commits mailing list