diff options
Diffstat (limited to 'tests/topotests/lib/lutil.py')
| -rwxr-xr-x | tests/topotests/lib/lutil.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/tests/topotests/lib/lutil.py b/tests/topotests/lib/lutil.py new file mode 100755 index 0000000000..3ae18018bf --- /dev/null +++ b/tests/topotests/lib/lutil.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python + +# Copyright 2017, LabN Consulting, L.L.C. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; see the file COPYING; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import re +import sys +import time +import datetime +import json +from topolog import logger +from mininet.net import Mininet + + +# L utility functions +# +# These functions are inteneted to provide support for CI testing within MiniNet +# environments. + +class lUtil: + #to be made configurable in the future + base_script_dir = '.' + base_log_dir = '.' + fout_name = 'output.log' + fsum_name = 'summary.txt' + l_level = 9 + CallOnFail = False + + l_total = 0 + l_pass = 0 + l_fail = 0 + l_filename = '' + l_last = None + l_line = 0 + l_dotall_experiment = False + l_last_nl = None + + fout = '' + fsum = '' + net = '' + + def log(self, str): + if self.l_level > 0: + if self.fout == '': + self.fout = open(self.fout_name, 'w', 0) + self.fout.write(str+'\n') + if self.l_level > 5: + print(str) + + def summary(self, str): + if self.fsum == '': + self.fsum = open(self.fsum_name, 'w', 0) + self.fsum.write('\ +******************************************************************************\n') + self.fsum.write('\ +Test Target Summary Pass Fail\n') + self.fsum.write('\ +******************************************************************************\n') + self.fsum.write(str+'\n') + + def result(self, target, success, str, logstr=None): + if success: + p = 1 + f = 0 + self.l_pass += 1 + sstr = "PASS" + else: + f = 1 + p = 0 + self.l_fail += 1 + sstr = "FAIL" + self.l_total += 1 + if logstr != None: + self.log("R:%d %s: %s" % (self.l_total, sstr, logstr)) + res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f) + self.log ('R:'+res) + self.summary(res) + if f == 1 and self.CallOnFail != False: + self.CallOnFail() + + def closeFiles(self): + ret = '\ +******************************************************************************\n\ +Total %-4d %-4d %d\n\ +******************************************************************************'\ +% (self.l_total, self.l_pass, self.l_fail) + if self.fsum != '': + self.fsum.write(ret + '\n') + self.fsum.close() + self.fsum = '' + if self.fout != '': + if os.path.isfile(self.fsum_name): + r = open(self.fsum_name, 'r') + self.fout.write(r.read()) + r.close() + self.fout.close() + self.fout = '' + return ret + + def setFilename(self, name): + str = 'FILE: ' + name + self.log(str) + self.summary(str) + self.l_filename = name + self.line = 0 + + def getCallOnFail(self): + return self.CallOnFail + + def setCallOnFail(self, CallOnFail): + self.CallOnFail = CallOnFail + + def strToArray(self, string): + a = [] + c = 0 + end = '' + words = string.split() + if len(words) < 1 or words[0].startswith('#'): + return a + words = string.split() + for word in words: + if len(end) == 0: + a.append(word) + else: + a[c] += str(' '+word) + if end == '\\': + end = '' + if not word.endswith('\\'): + if end != '"': + if word.startswith('"'): + end = '"' + else: + c += 1 + else: + if word.endswith('"'): + end = '' + c += 1 + else: + c += 1 + else: + end = '\\' + # if len(end) == 0: + # print('%d:%s:' % (c, a[c-1])) + + return a + + def execTestFile(self, tstFile): + if os.path.isfile(tstFile): + f = open(tstFile) + for line in f: + if len(line) > 1: + a = self.strToArray(line) + if len(a) >= 6: + luCommand(a[1], a[2], a[3], a[4], a[5]) + else: + self.l_line += 1 + self.log('%s:%s %s' % (self.l_filename, self.l_line , line)) + if len(a) >= 2: + if a[0] == 'sleep': + time.sleep(int(a[1])) + elif a[0] == 'include': + self.execTestFile(a[1]) + f.close() + else: + self.log('unable to read: ' + tstFile) + sys.exit(1) + + def command(self, target, command, regexp, op, result, returnJson): + global net + if op != 'wait': + self.l_line += 1 + self.log('(#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:' % \ + (self.l_total+1, + self.l_filename, self.l_line, target, command, regexp, op, result)) + if self.net == '': + return False + #self.log("Running %s %s" % (target, command)) + js = None + out = self.net[target].cmd(command).rstrip() + if len(out) == 0: + report = "<no output>" + else: + report = out + if returnJson == True: + try: + js = json.loads(out) + except: + js = None + self.log('WARNING: JSON load failed -- confirm command output is in JSON format.') + self.log('COMMAND OUTPUT:%s:' % report) + + # Experiment: can we achieve the same match behavior via DOTALL + # without converting newlines to spaces? + out_nl = out + search_nl = re.search(regexp, out_nl, re.DOTALL); + self.l_last_nl = search_nl + # Set up for comparison + if search_nl != None: + group_nl = search_nl.group() + group_nl_converted = " ".join(group_nl.splitlines()) + else: + group_nl_converted = None + + out = " ".join(out.splitlines()) + search = re.search(regexp, out) + self.l_last = search + if search == None: + if op == 'fail': + success = True + else: + success = False + ret = success + else: + ret = search.group() + self.log('found:%s:' % ret) + if op != 'fail': + success = True + else: + success = False + # Experiment: compare matched strings obtained each way + if self.l_dotall_experiment and (group_nl_converted != ret): + self.log('DOTALL experiment: strings differ dotall=[%s] orig=[%s]' % (group_nl_converted, ret)) + if op == 'pass' or op == 'fail': + self.result(target, success, result) + if js != None: + return js + return ret + + def wait(self, target, command, regexp, op, result, wait, returnJson): + self.log('%s:%s WAIT:%s:%s:%s:%s:%s:%s:' % \ + (self.l_filename, self.l_line, target, command, regexp, op, result,wait)) + llevel = LUtil.l_level + found = False + n = 0 + startt = time.time() + delta = time.time() - startt + while delta < wait and found is False: + found = self.command(target, command, regexp, op, result, returnJson) + n+=1 + LUtil.l_level = 0 + delta = time.time() - startt + if delta < wait and found is False: + time.sleep (0.5) + LUtil.l_level = llevel + self.log('Done after %d loops, time=%s, Found=%s' % (n, delta, found)) + found = self.command(target, command, regexp, 'pass', '%s +%4.2f secs' % (result, delta), returnJson) + return found + +#initialized by luStart +LUtil=None + +#entry calls +def luStart(baseScriptDir='.', baseLogDir='.', net='', + fout='output.log', fsum='summary.txt', level=9): + global LUtil + #init class + LUtil=lUtil() + LUtil.base_script_dir = baseScriptDir + LUtil.base_log_dir = baseLogDir + LUtil.net = net + if fout != '': + LUtil.fout_name = baseLogDir + '/' + fout + if fsum != None: + LUtil.fsum_name = baseLogDir + '/' + fsum + LUtil.l_level = level + LUtil.l_dotall_experiment = False + LUtil.l_dotall_experiment = True + +def luCommand(target, command, regexp='.', op='none', result='', time=10, returnJson=False): + if op != 'wait': + return LUtil.command(target, command, regexp, op, result, returnJson) + else: + return LUtil.wait(target, command, regexp, op, result, time, returnJson) + +def luLast(usenl=False): + if usenl: + if LUtil.l_last_nl != None: + LUtil.log('luLast:%s:' % LUtil.l_last_nl.group()) + return LUtil.l_last_nl + else: + if LUtil.l_last != None: + LUtil.log('luLast:%s:' % LUtil.l_last.group()) + return LUtil.l_last + +def luInclude(filename, CallOnFail=None): + tstFile = LUtil.base_script_dir + '/' + filename + LUtil.setFilename(filename) + if CallOnFail != None: + oldCallOnFail = LUtil.getCallOnFail() + LUtil.setCallOnFail(CallOnFail) + if filename.endswith('.py'): + LUtil.log("luInclude: execfile "+tstFile) + execfile(tstFile) + else: + LUtil.log("luInclude: execTestFile "+tstFile) + LUtil.execTestFile(tstFile) + if CallOnFail != None: + LUtil.setCallOnFail(oldCallOnFail) + +def luFinish(): + global LUtil + ret = LUtil.closeFiles() + #done + LUtil = None + return ret; + +def luNumFail(): + return LUtil.l_fail + +def luNumPass(): + return LUtil.l_pass + +def luResult(target, success, str, logstr=None): + return LUtil.result(target, success, str, logstr) + +def luShowFail(): + printed = 0 + sf = open(LUtil.fsum_name, 'r') + for line in sf: + if line[-2] != "0": + printed+=1 + logger.error(line.rstrip()) + sf.close() + if printed > 0: + logger.error("See %s for details of errors" % LUtil.fout_name) + +#for testing +if __name__ == '__main__': + print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/lib') + luStart() + for arg in sys.argv[1:]: + luInclude(arg) + luFinish() + sys.exit(0) + |
