--- /dev/null
+#!/usr/bin/env python
+
+# Copyright 2017, LabN Consulting, L.L.C.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; see the file COPYING; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import re
+import sys
+import time
+import datetime
+from mininet.net import Mininet
+
+
+# L utility functions
+#
+# These functions are inteneted to provide support for CI testing within MiniNet
+# environments.
+
+class lUtil:
+ #to be made configurable in the future
+ fout_name = 'output.log'
+ fsum_name = 'summary.txt'
+ l_level = 9
+ base_dir = '.'
+
+ l_total = 0
+ l_pass = 0
+ l_fail = 0
+ l_filename = ''
+ l_line = 0
+
+ fout = ''
+ fsum = ''
+ net = ''
+
+ def log(self, str):
+ if self.l_level > 0:
+ if self.fout == '':
+ self.fout = open(self.fout_name, 'w', 0)
+ self.fout.write(str+'\n')
+ if self.l_level > 5:
+ print(str)
+
+ def result(self, target, success, str):
+ if success:
+ p = 1
+ f = 0
+ self.l_pass += 1
+ else:
+ f = 1
+ p = 0
+ self.l_fail += 1
+ res = "%-4d %-6s %-56s %-4d %-4d" % (self.l_total, target, str, p, f)
+ self.log (res)
+ if self.fsum == '':
+ self.fsum = open(self.fsum_name, 'w', 0)
+ self.fsum.write('\
+******************************************************************************\n')
+ self.fsum.write('\
+Test Target Summary Pass Fail\n')
+ self.fsum.write('\
+******************************************************************************\n')
+ self.fsum.write(res+'\n')
+
+ def closeFiles(self):
+ ret = '\
+******************************************************************************\n\
+Total %-4d %-4d %-4d\n\
+******************************************************************************'\
+% (self.l_total, self.l_pass, self.l_fail)
+ if self.fsum != '':
+ self.fsum.write(ret + '\n')
+ self.fsum.close()
+ self.fsum = ''
+ if self.fout != '':
+ if os.path.isfile(self.fsum_name):
+ r = open(self.fsum_name, 'r')
+ self.fout.write(r.read())
+ r.close()
+ self.fout.close()
+ self.fout = ''
+ return ret
+
+ def setFilename(self, name):
+ self.log('FILE: ' + name)
+ self.l_filename = name
+ self.line = 0
+
+ def strToArray(self, string):
+ a = []
+ c = 0
+ end = ''
+ words = string.split()
+ if len(words) < 1 or words[0].startswith('#'):
+ return a
+ words = string.split()
+ for word in words:
+ if len(end) == 0:
+ a.append(word)
+ else:
+ a[c] += str(' '+word)
+ if end == '\\':
+ end = ''
+ if not word.endswith('\\'):
+ if end != '"':
+ if word.startswith('"'):
+ end = '"'
+ else:
+ c += 1
+ else:
+ if word.endswith('"'):
+ end = ''
+ c += 1
+ else:
+ c += 1
+ else:
+ end = '\\'
+ # if len(end) == 0:
+ # print('%d:%s:' % (c, a[c-1]))
+
+ return a
+
+ def execTestFile(self, tstFile):
+ if os.path.isfile(tstFile):
+ f = open(tstFile)
+ for line in f:
+ if len(line) > 1:
+ a = self.strToArray(line)
+ if len(a) >= 6:
+ luCommand(a[1], a[2], a[3], a[4], a[5])
+ else:
+ self.l_line += 1
+ self.log('%s:%s %s' % (self.l_filename, self.l_line , line))
+ if len(a) >= 2:
+ if a[0] == 'sleep':
+ time.sleep(int(a[1]))
+ elif a[0] == 'include':
+ self.execTestFile(a[1])
+ f.close()
+ else:
+ self.log('unable to read: ' + tstFile)
+ sys.exit(1)
+
+ def command(self, target, command, regexp, op, result):
+ global net
+ if op != 'wait':
+ self.l_line += 1
+ self.l_total += 1
+ self.log('%s:%s COMMAND:%s:%s:%s:%s:%s:' % \
+ (self.l_filename, self.l_line, target, command, regexp, op, result))
+ if self.net == '':
+ return False
+ #self.log("Running %s %s" % (target, command))
+ out = self.net[target].cmd(command).rstrip()
+ self.log('out:%s:' % out)
+ out = " ".join(out.splitlines())
+ search = re.search(regexp, out)
+ self.l_last = search
+ if search == None:
+ if op == 'fail':
+ success = True
+ else:
+ success = False
+ ret = success
+ else:
+ ret = search.group()
+ self.log('found:%s:' % ret)
+ if op != 'fail':
+ success = True
+ else:
+ success = False
+ if op == 'pass' or op == 'fail':
+ self.result(target, success, result)
+ return ret
+
+ def wait(self, target, command, regexp, op, result, wait):
+ self.log('%s:%s WAIT:%s:%s:%s:%s:%s:%s:' % \
+ (self.l_filename, self.l_line, target, command, regexp, op, result,wait))
+ llevel = LUtil.l_level
+ found = False
+ n = 0
+ startt = time.time()
+ delta = time.time() - startt
+ while delta < wait and found is False:
+ found = self.command(target, command, regexp, op, result)
+ n+=1
+ LUtil.l_level = 0
+ delta = time.time() - startt
+ if delta < wait and found is False:
+ time.sleep (0.5)
+ LUtil.l_level = llevel
+ self.log('Done after %d loops, time=%s, Found=%s' % (n, delta, found))
+ found = self.command(target, command, regexp, 'pass', '%s +%4.2f secs' % (result, delta))
+ return found
+
+#init class
+LUtil=lUtil()
+
+#entry calls
+def luStart(baseDir='.', net='', fout='output.log', fsum='summary.txt', level=9):
+ LUtil.base_dir = baseDir
+ LUtil.net = net
+ if fout != '':
+ LUtil.fout_name = fout
+ if fsum != '':
+ LUtil.fsum_name = fsum
+ LUtil.l_level = level
+
+def luCommand(target, command, regexp='.', op='none', result='', time=60):
+ if op != 'wait':
+ return LUtil.command(target, command, regexp, op, result)
+ else:
+ return LUtil.wait(target, command, regexp, op, result, time)
+
+
+def luInclude(filename):
+ global LUtil
+ tstFile = LUtil.base_dir + '/' + filename
+ LUtil.setFilename(filename)
+ if filename.endswith('.py'):
+ execfile(tstFile)
+ else:
+ LUtil.execTestFile(tstFile)
+
+def luFinish():
+ return LUtil.closeFiles()
+
+def luNumFail():
+ return LUtil.l_fail
+
+def luNumPass():
+ return LUtil.l_pass
+
+#for testing
+if __name__ == '__main__':
+ print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/utilities')
+ luStart()
+ for arg in sys.argv[1:]:
+ luInclude(arg)
+ luFinish()
+ sys.exit(0)
+