From 199a7c79c135da47f6659e233624a45ee39029ca Mon Sep 17 00:00:00 2001 From: Lou Berger Date: Fri, 3 Nov 2017 16:35:51 -0400 Subject: [PATCH] lutil: initial version Signed-off-by: Lou Berger --- tests/topotests/utilities/lutil.py | 255 +++++++++++++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100755 tests/topotests/utilities/lutil.py diff --git a/tests/topotests/utilities/lutil.py b/tests/topotests/utilities/lutil.py new file mode 100755 index 0000000000..ea0f938eae --- /dev/null +++ b/tests/topotests/utilities/lutil.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python + +# Copyright 2017, LabN Consulting, L.L.C. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; see the file COPYING; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import re +import sys +import time +import datetime +from mininet.net import Mininet + + +# L utility functions +# +# These functions are inteneted to provide support for CI testing within MiniNet +# environments. + +class lUtil: + #to be made configurable in the future + fout_name = 'output.log' + fsum_name = 'summary.txt' + l_level = 9 + base_dir = '.' + + l_total = 0 + l_pass = 0 + l_fail = 0 + l_filename = '' + l_line = 0 + + fout = '' + fsum = '' + net = '' + + def log(self, str): + if self.l_level > 0: + if self.fout == '': + self.fout = open(self.fout_name, 'w', 0) + self.fout.write(str+'\n') + if self.l_level > 5: + print(str) + + def result(self, target, success, str): + if success: + p = 1 + f = 0 + self.l_pass += 1 + else: + f = 1 + p = 0 + self.l_fail += 1 + res = "%-4d %-6s %-56s %-4d %-4d" % (self.l_total, target, str, p, f) + self.log (res) + if self.fsum == '': + self.fsum = open(self.fsum_name, 'w', 0) + self.fsum.write('\ +******************************************************************************\n') + self.fsum.write('\ +Test Target Summary Pass Fail\n') + self.fsum.write('\ +******************************************************************************\n') + self.fsum.write(res+'\n') + + def closeFiles(self): + ret = '\ +******************************************************************************\n\ +Total %-4d %-4d %-4d\n\ +******************************************************************************'\ +% (self.l_total, self.l_pass, self.l_fail) + if self.fsum != '': + self.fsum.write(ret + '\n') + self.fsum.close() + self.fsum = '' + if self.fout != '': + if os.path.isfile(self.fsum_name): + r = open(self.fsum_name, 'r') + self.fout.write(r.read()) + r.close() + self.fout.close() + self.fout = '' + return ret + + def setFilename(self, name): + self.log('FILE: ' + name) + self.l_filename = name + self.line = 0 + + def strToArray(self, string): + a = [] + c = 0 + end = '' + words = string.split() + if len(words) < 1 or words[0].startswith('#'): + return a + words = string.split() + for word in words: + if len(end) == 0: + a.append(word) + else: + a[c] += str(' '+word) + if end == '\\': + end = '' + if not word.endswith('\\'): + if end != '"': + if word.startswith('"'): + end = '"' + else: + c += 1 + else: + if word.endswith('"'): + end = '' + c += 1 + else: + c += 1 + else: + end = '\\' + # if len(end) == 0: + # print('%d:%s:' % (c, a[c-1])) + + return a + + def execTestFile(self, tstFile): + if os.path.isfile(tstFile): + f = open(tstFile) + for line in f: + if len(line) > 1: + a = self.strToArray(line) + if len(a) >= 6: + luCommand(a[1], a[2], a[3], a[4], a[5]) + else: + self.l_line += 1 + self.log('%s:%s %s' % (self.l_filename, self.l_line , line)) + if len(a) >= 2: + if a[0] == 'sleep': + time.sleep(int(a[1])) + elif a[0] == 'include': + self.execTestFile(a[1]) + f.close() + else: + self.log('unable to read: ' + tstFile) + sys.exit(1) + + def command(self, target, command, regexp, op, result): + global net + if op != 'wait': + self.l_line += 1 + self.l_total += 1 + self.log('%s:%s COMMAND:%s:%s:%s:%s:%s:' % \ + (self.l_filename, self.l_line, target, command, regexp, op, result)) + if self.net == '': + return False + #self.log("Running %s %s" % (target, command)) + out = self.net[target].cmd(command).rstrip() + self.log('out:%s:' % out) + out = " ".join(out.splitlines()) + search = re.search(regexp, out) + self.l_last = search + if search == None: + if op == 'fail': + success = True + else: + success = False + ret = success + else: + ret = search.group() + self.log('found:%s:' % ret) + if op != 'fail': + success = True + else: + success = False + if op == 'pass' or op == 'fail': + self.result(target, success, result) + return ret + + def wait(self, target, command, regexp, op, result, wait): + self.log('%s:%s WAIT:%s:%s:%s:%s:%s:%s:' % \ + (self.l_filename, self.l_line, target, command, regexp, op, result,wait)) + llevel = LUtil.l_level + found = False + n = 0 + startt = time.time() + delta = time.time() - startt + while delta < wait and found is False: + found = self.command(target, command, regexp, op, result) + n+=1 + LUtil.l_level = 0 + delta = time.time() - startt + if delta < wait and found is False: + time.sleep (0.5) + LUtil.l_level = llevel + self.log('Done after %d loops, time=%s, Found=%s' % (n, delta, found)) + found = self.command(target, command, regexp, 'pass', '%s +%4.2f secs' % (result, delta)) + return found + +#init class +LUtil=lUtil() + +#entry calls +def luStart(baseDir='.', net='', fout='output.log', fsum='summary.txt', level=9): + LUtil.base_dir = baseDir + LUtil.net = net + if fout != '': + LUtil.fout_name = fout + if fsum != '': + LUtil.fsum_name = fsum + LUtil.l_level = level + +def luCommand(target, command, regexp='.', op='none', result='', time=60): + if op != 'wait': + return LUtil.command(target, command, regexp, op, result) + else: + return LUtil.wait(target, command, regexp, op, result, time) + + +def luInclude(filename): + global LUtil + tstFile = LUtil.base_dir + '/' + filename + LUtil.setFilename(filename) + if filename.endswith('.py'): + execfile(tstFile) + else: + LUtil.execTestFile(tstFile) + +def luFinish(): + return LUtil.closeFiles() + +def luNumFail(): + return LUtil.l_fail + +def luNumPass(): + return LUtil.l_pass + +#for testing +if __name__ == '__main__': + print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/utilities') + luStart() + for arg in sys.argv[1:]: + luInclude(arg) + luFinish() + sys.exit(0) + -- 2.39.5