[asterisk-commits] jpeeler: branch jpeeler/event_watcher r253 - /asterisk/team/jpeeler/event_wat...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Apr 27 15:00:39 CDT 2010
Author: jpeeler
Date: Tue Apr 27 15:00:35 2010
New Revision: 253
URL: http://svnview.digium.com/svn/testsuite?view=rev&rev=253
Log:
commit this before converting (probably) test module into a class
Modified:
asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py
Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py?view=diff&rev=253&r1=252&r2=253
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/client.py Tue Apr 27 15:00:35 2010
@@ -8,14 +8,27 @@
class EventWatcher(propertied.Propertied):
- def __init__(self):
+ def retard(self):
+ print "DEBUG: I don't know python, can I call this?"
+
+ def clear_vars(self):
+
self.eventList = list()
self.count = 0
+ self.sendEventList = list()
self.ordered = False
+
+ def __init__(self):
self.log = logging.getLogger('TestAMI')
self.log.setLevel(logging.INFO)
- self.sendEventList = list()
self.ami = None
+ self.testcount = 0
+ #self.next_test = None
+
+ self.clear_vars()
+
+ def next_test(self, cb):
+ self.next_test = cb
def set_ordered(self, ordered):
self.ordered = ordered
@@ -52,6 +65,7 @@
return True
def timeout(self):
+ print "DEBUG: timed out"
reactor.stop()
def dict_in_dict(self, d_set, d_subset):
@@ -63,6 +77,12 @@
amiDF = APPLICATION.amiSpecifier.login().addCallbacks(self.onAMIConnect, self.onAMIFailure)
self.show_events()
+
+ def sendEvents(self, ami):
+ if self.sendEventList:
+ for amessage in self.sendEventList:
+ id = ami.sendMessage(amessage, self.onSentResponse)
+ self.log.debug("!!!!!!!!!!!!!!!! SENDING %s with id %s" % (amessage, id))
def onAMIFailure(self, ami):
self.log.critical("Stop asterisk")
@@ -82,16 +102,15 @@
ami.registerEvent(None, self.onAnyEvent)
self.ami = ami
- if self.sendEventList:
- for amessage in self.sendEventList:
- id = ami.sendMessage(amessage, self.onSentResponse)
- self.log.debug("SENDING %s with id %s" % (amessage, id))
+ self.sendEvents(ami)
def onSentResponse(self, result):
print "result ---------------------------->", result
# don't care about connection terminated event
- if self.ami:
+ if result.__str__() != "Connection was closed cleanly: FastAGI connection terminated.":
self.onAnyEvent(self.ami, result)
+ else:
+ print "DEBUG: aborted because connection close event"
def connectionLost(reason):
self.log.critical("UNPLANNED? connection lost")
@@ -124,13 +143,43 @@
self.stopMonitoring(ami)
continue
+ def end_test(self):
+ if reactor.running:
+ reactor.stop()
+ self.log.critical("DEBUG: DONE, end_test")
+
def stopMonitoring(self, ami):
self.log.debug("About to shut down all monitoring")
- self.ami = None
+
+ #self.ami = None
+ #ami.deregisterEvent(None, None)
+ #self.end_test()
+
ami.deregisterEvent(None, None)
- if reactor.running:
- reactor.stop()
- self.log.critical("DONE")
+
+ passed = self.check_events()
+ if not passed:
+ print "TEST FAILED"
+ self.end_test()
+ return
+ else:
+ print "DEBUG: TEST GOOD"
+
+ self.clear_vars()
+ toexec = "test" + str(self.testcount)
+ res = self.next_test(self, self.testcount, passed, toexec)
+ print "DEBUG: res %s from exec %s" % (res, toexec)
+ if res == -1:
+ self.ami = None
+ ami.deregisterEvent(None, None)
+ self.end_test()
+ return
+ self.testcount = self.testcount + 1
+ ami.registerEvent(None, self.onAnyEvent)
+ self.sendEvents(ami)
+
+
+ # TODO: need to have reactor call later rescheduling
def onStatus(self, events, ami=None):
self.log.debug("Initial channel status retrieved")
Modified: asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py
URL: http://svnview.digium.com/svn/testsuite/asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py?view=diff&rev=253&r1=252&r2=253
==============================================================================
--- asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py (original)
+++ asterisk/team/jpeeler/event_watcher/tests/ami-monitor/test.py Tue Apr 27 15:00:35 2010
@@ -10,6 +10,44 @@
from basicproperty import common, propertied, basic
import subprocess
+def test0(watcher):
+ print "WOOT! - test0"
+ event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+ watcher.add_event(event1)
+ event_send = {'Action' : 'ping'}
+ watcher.add_send_event(event_send)
+ print "ENDWOOT"
+
+def test1(watcher):
+ print "WOOT2! - test1"
+ sleep(3) # DELETE THIS
+ event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
+ watcher.add_event(event1)
+ event_send = {'Action' : 'ping'}
+ watcher.add_send_event(event_send)
+ print "ENDWOOT2"
+
+def next_test(watcher, count, passed, toexec):
+ print "NEXT TEST YAYA ", count, passed
+ if not passed:
+ return 1
+
+ #TODO: use hasattr
+ print "about to see...", toexec
+ try:
+ obj = eval(toexec)
+ except NameError:
+ print "DEBUG: no more tests"
+ return -1
+ print "obj found ", obj
+ if callable(obj):
+ print "DEBUG: determined next test exists"
+ eval(toexec)(watcher)
+ if len(watcher.eventList) > 0:
+ return 0
+
+ print "DEBUG: returning, no events..."
+ return -1 # exception or something
def main():
logging.basicConfig()
@@ -35,14 +73,15 @@
#watcher.add_event(event3)
#watcher.set_ordered(True)
+ watcher.next_test(next_test)
+
event1 = [{'response' : 'Success', 'ping' : 'Pong'}]
watcher.add_event(event1)
-
event_send = {'Action' : 'ping'}
watcher.add_send_event(event_send)
reactor.callWhenRunning(watcher.main)
- reactor.callLater(5, watcher.timeout)
+ callID = reactor.callLater(1, watcher.timeout)
reactor.run()
watcher.show_events()
More information about the asterisk-commits
mailing list