net['r%s' % i].loadConf('ospfd', '%s/r%s/ospfd.conf' % (thisDir, i))
if net['r1'].checkRouterVersion('<', '4.0'):
net['r%s' % i].loadConf('ospf6d', '%s/r%s/ospf6d.conf-pre-v4' % (thisDir, i))
- else:
- net['r%s' % i].loadConf('ospf6d', '%s/r%s/ospf6d.conf' % (thisDir, i))
+ else:
+ net['r%s' % i].loadConf('ospf6d', '%s/r%s/ospf6d.conf' % (thisDir, i))
net['r%s' % i].loadConf('isisd', '%s/r%s/isisd.conf' % (thisDir, i))
net['r%s' % i].loadConf('bgpd', '%s/r%s/bgpd.conf' % (thisDir, i))
if net['r%s' % i].daemon_available('ldpd'):
actual = net['r%s' % i].cmd('vtysh -c "show ip ospf interface" 2> /dev/null').rstrip()
# Mask out Bandwidth portion. They may change..
actual = re.sub(r"BW [0-9]+ Mbit", "BW XX Mbit", actual)
- actual = re.sub(r"ifindex [0-9]", "ifindex X", actual)
+ actual = re.sub(r"ifindex [0-9]", "ifindex X", actual)
# Drop time in next due
actual = re.sub(r"Hello due in [0-9\.]+s", "Hello due in XX.XXXs", actual)
print("******************************************\n")
diffresult = {}
for i in range(1, 2):
- success = 0
- for refTableFile in (glob.glob(
- '%s/r%s/show_bgp_ipv4*.ref' % (thisDir, i))):
- if os.path.isfile(refTableFile):
- # Read expected result from file
- expected = open(refTableFile).read().rstrip()
- # Fix newlines (make them all the same)
- expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
-
- # Actual output from router
- actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv4" 2> /dev/null').rstrip()
- # Remove summary line (changed recently)
- actual = re.sub(r'Total number.*', '', actual)
- actual = re.sub(r'Displayed.*', '', actual)
- actual = actual.rstrip()
- # Fix newlines (make them all the same)
- actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
-
- # Generate Diff
- diff = topotest.get_textdiff(actual, expected,
- title1="actual SHOW BGP IPv4",
- title2="expected SHOW BGP IPv4")
-
- # Empty string if it matches, otherwise diff contains unified diff
- if diff:
- diffresult[refTableFile] = diff
- else:
- success = 1
- print("template %s matched: r%s ok" % (refTableFile, i))
- break
-
- if not success:
- resultstr = 'No template matched.\n'
- for f in diffresult.iterkeys():
- resultstr += (
- 'template %s: r%s failed SHOW BGP IPv4 check:\n%s\n'
- % (f, i, diffresult[f]))
- raise AssertionError(
- "SHOW BGP IPv4 failed for router r%s:\n%s" % (i, resultstr))
+ success = 0
+ for refTableFile in (glob.glob(
+ '%s/r%s/show_bgp_ipv4*.ref' % (thisDir, i))):
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv4" 2> /dev/null').rstrip()
+ # Remove summary line (changed recently)
+ actual = re.sub(r'Total number.*', '', actual)
+ actual = re.sub(r'Displayed.*', '', actual)
+ actual = actual.rstrip()
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = topotest.get_textdiff(actual, expected,
+ title1="actual SHOW BGP IPv4",
+ title2="expected SHOW BGP IPv4")
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ diffresult[refTableFile] = diff
+ else:
+ success = 1
+ print("template %s matched: r%s ok" % (refTableFile, i))
+ break
+
+ if not success:
+ resultstr = 'No template matched.\n'
+ for f in diffresult.iterkeys():
+ resultstr += (
+ 'template %s: r%s failed SHOW BGP IPv4 check:\n%s\n'
+ % (f, i, diffresult[f]))
+ raise AssertionError(
+ "SHOW BGP IPv4 failed for router r%s:\n%s" % (i, resultstr))
# Make sure that all daemons are running
for i in range(1, 2):
print("******************************************\n")
diffresult = {}
for i in range(1, 2):
- success = 0
- for refTableFile in (glob.glob(
- '%s/r%s/show_bgp_ipv6*.ref' % (thisDir, i))):
- if os.path.isfile(refTableFile):
- # Read expected result from file
- expected = open(refTableFile).read().rstrip()
- # Fix newlines (make them all the same)
- expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
-
- # Actual output from router
- actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv6" 2> /dev/null').rstrip()
- # Remove summary line (changed recently)
- actual = re.sub(r'Total number.*', '', actual)
- actual = re.sub(r'Displayed.*', '', actual)
- actual = actual.rstrip()
- # Fix newlines (make them all the same)
- actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
-
- # Generate Diff
- diff = topotest.get_textdiff(actual, expected,
- title1="actual SHOW BGP IPv6",
- title2="expected SHOW BGP IPv6")
-
- # Empty string if it matches, otherwise diff contains unified diff
- if diff:
- diffresult[refTableFile] = diff
- else:
- success = 1
- print("template %s matched: r%s ok" % (refTableFile, i))
-
- if not success:
- resultstr = 'No template matched.\n'
- for f in diffresult.iterkeys():
- resultstr += (
- 'template %s: r%s failed SHOW BGP IPv6 check:\n%s\n'
- % (f, i, diffresult[f]))
- raise AssertionError(
- "SHOW BGP IPv6 failed for router r%s:\n%s" % (i, resultstr))
+ success = 0
+ for refTableFile in (glob.glob(
+ '%s/r%s/show_bgp_ipv6*.ref' % (thisDir, i))):
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv6" 2> /dev/null').rstrip()
+ # Remove summary line (changed recently)
+ actual = re.sub(r'Total number.*', '', actual)
+ actual = re.sub(r'Displayed.*', '', actual)
+ actual = actual.rstrip()
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = topotest.get_textdiff(actual, expected,
+ title1="actual SHOW BGP IPv6",
+ title2="expected SHOW BGP IPv6")
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ diffresult[refTableFile] = diff
+ else:
+ success = 1
+ print("template %s matched: r%s ok" % (refTableFile, i))
+
+ if not success:
+ resultstr = 'No template matched.\n'
+ for f in diffresult.iterkeys():
+ resultstr += (
+ 'template %s: r%s failed SHOW BGP IPv6 check:\n%s\n'
+ % (f, i, diffresult[f]))
+ raise AssertionError(
+ "SHOW BGP IPv6 failed for router r%s:\n%s" % (i, resultstr))
# Make sure that all daemons are running
for i in range(1, 2):
vtep_ips.append(vtep["vtep_ip"])
if "local" in types:
- result = check_local_es(esi, vtep_ips, dut.name, [])
+ result = check_local_es(esi, vtep_ips, dut.name, [])
else:
- result = check_remote_es(esi, vtep_ips, dut.name, [])
+ result = check_remote_es(esi, vtep_ips, dut.name, [])
if result:
return result
logger.info('result: '+output);
router = tgen.gears['r1']
- for rname, router in router_list.iteritems():
+ for rname, router in router_list.items():
if rname == 'r1':
router.load_config(
TopoRouter.RD_ZEBRA,
if code not in ["P", "A", "U", "Q", "R", "r", "s"]:
continue
- if not result.has_key(code):
+ if code not in result:
result[code] = {}
# Split network from the rest
1. RD verification (manual/auto).
2. RT verification(manual)
3. In an active/standby EVPN implementation, if active DCG goes down,
- secondary takes over.
+ secondary takes over.
4. EVPN routes are advertised/withdrawn, based on VNFs
- advertising/withdrawing IP prefixes.
+ advertising/withdrawing IP prefixes.
5. Route-map operations for EVPN address family.
6. BGP attributes for EVPN address-family.
"""
dist = platform.dist()
if (dist[1] == "16.04"):
- pytest.skip("Kernel not supported for vrf")
+ pytest.skip("Kernel not supported for vrf")
"Check whether all expected routes are present and installed in the OS"
tgen = get_topogen()
dist = platform.dist()
if (dist[1] == "16.04"):
- pytest.skip("Kernel not supported for vrf")
+ pytest.skip("Kernel not supported for vrf")
"Check whether all expected routes are present and installed in the OS"
tgen = get_topogen()
from tempfile import mkdtemp
import os
+import io
import sys
-import ConfigParser
import traceback
import socket
import ipaddress
import platform
-
if sys.version_info[0] > 2:
- import io
import configparser
else:
import StringIO
if SkipIfFailed and tgen.routers_have_failure():
pytest.skip(tgen.errors)
if numEntry > 0:
- if not KeepGoing:
- pytest.skip("Have %d errors" % numEntry)
+ if not KeepGoing:
+ pytest.skip("Have %d errors" % numEntry)
if CheckFuncStr != None:
check = eval(CheckFuncStr)
if numFail > 0:
luShowFail()
fatal_error = "%d tests failed" % numFail
- if not KeepGoing:
- assert "scripts/cleanup_all.py failed" == "See summary output above", fatal_error
+ if not KeepGoing:
+ assert "scripts/cleanup_all.py failed" == "See summary output above", fatal_error
# Memory leak test template
def test_memory_leak():
self.log('WARNING: JSON load failed -- confirm command output is in JSON format.')
self.log('COMMAND OUTPUT:%s:' % report)
- # Experiment: can we achieve the same match behavior via DOTALL
- # without converting newlines to spaces?
- out_nl = out
- search_nl = re.search(regexp, out_nl, re.DOTALL);
- self.l_last_nl = search_nl
- # Set up for comparison
- if search_nl != None:
- group_nl = search_nl.group()
- group_nl_converted = " ".join(group_nl.splitlines())
+ # Experiment: can we achieve the same match behavior via DOTALL
+ # without converting newlines to spaces?
+ out_nl = out
+ search_nl = re.search(regexp, out_nl, re.DOTALL);
+ self.l_last_nl = search_nl
+ # Set up for comparison
+ if search_nl != None:
+ group_nl = search_nl.group()
+ group_nl_converted = " ".join(group_nl.splitlines())
else:
- group_nl_converted = None
+ group_nl_converted = None
out = " ".join(out.splitlines())
search = re.search(regexp, out)
success = False
level = 5
self.log('found:%s:' % ret, level)
- # Experiment: compare matched strings obtained each way
- if self.l_dotall_experiment and (group_nl_converted != ret):
- self.log('DOTALL experiment: strings differ dotall=[%s] orig=[%s]' % (group_nl_converted, ret), 9)
+ # Experiment: compare matched strings obtained each way
+ if self.l_dotall_experiment and (group_nl_converted != ret):
+ self.log('DOTALL experiment: strings differ dotall=[%s] orig=[%s]' % (group_nl_converted, ret), 9)
if op == 'pass' or op == 'fail':
self.result(target, success, result)
if js != None:
def luLast(usenl=False):
if usenl:
- if LUtil.l_last_nl != None:
- LUtil.log('luLast:%s:' % LUtil.l_last_nl.group(), 7)
- return LUtil.l_last_nl
+ if LUtil.l_last_nl != None:
+ LUtil.log('luLast:%s:' % LUtil.l_last_nl.group(), 7)
+ return LUtil.l_last_nl
else:
- if LUtil.l_last != None:
- LUtil.log('luLast:%s:' % LUtil.l_last.group(), 7)
- return LUtil.l_last
+ if LUtil.l_last != None:
+ LUtil.log('luLast:%s:' % LUtil.l_last.group(), 7)
+ return LUtil.l_last
def luInclude(filename, CallOnFail=None):
tstFile = LUtil.base_script_dir + '/' + filename
logger.debug("Entering lib API: verify_ospf_neighbor()")
result = False
if input_dict:
- for router, rnode in tgen.routers().iteritems():
+ for router, rnode in tgen.routers().items():
if 'ospf' not in topo['routers'][router]:
continue
return errormsg
continue
else:
- for router, rnode in tgen.routers().iteritems():
+ for router, rnode in tgen.routers().items():
if 'ospf' not in topo['routers'][router]:
continue
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
- for router, rnode in router_list.iteritems():
+ for router, rnode in router_list.items():
if router != dut:
continue
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
- for router, rnode in tgen.routers().iteritems():
+ for router, rnode in tgen.routers().items():
if 'ospf' not in topo['routers'][router]:
continue
import os
import sys
+import io
import logging
import json
"verbosity": "info",
"frrdir": "/usr/lib/frr",
"routertype": "frr",
- "memleak_path": None,
+ "memleak_path": "",
}
params["frrdir"] = self.config.get(self.CONFIG_SECTION, "frrdir")
params["memleak_path"] = self.config.get(self.CONFIG_SECTION, "memleak_path")
- if not params.has_key("routertype"):
+ if "routertype" not in params:
params["routertype"] = self.config.get(self.CONFIG_SECTION, "routertype")
self.gears[name] = TopoRouter(self, cls, name, **params)
memleak_file = os.environ.get("TOPOTESTS_CHECK_MEMLEAK") or self.config.get(
self.CONFIG_SECTION, "memleak_path"
)
- if memleak_file is None:
+ if memleak_file == "" or memleak_file == None:
return False
return True
self.cls = cls
self.options = {}
self.routertype = params.get("routertype", "frr")
- if not params.has_key("privateDirs"):
+ if "privateDirs" not in params:
params["privateDirs"] = self.PRIVATE_DIRS
self.options["memleak_path"] = params.get("memleak_path", None)
memleak_file = (
os.environ.get("TOPOTESTS_CHECK_MEMLEAK") or self.options["memleak_path"]
)
- if memleak_file is None:
+ if memleak_file == "" or memleak_file == None:
return
self.stop()
logger.info("Running environment diagnostics")
# Load configuration
- config = configparser.ConfigParser(tgen_defaults)
+ config = configparser.ConfigParser(defaults=tgen_defaults)
pytestini_path = os.path.join(CWD, "../pytest.ini")
config.read(pytestini_path)
# TODO remove me when we start supporting exabgp >= 4
try:
- output = subprocess.check_output(["exabgp", "-v"])
- line = output.split("\n")[0]
- version = line.split(" ")[2]
- if topotest.version_cmp(version, "4") >= 0:
+ p = os.popen("exabgp -v")
+ line = p.readlines()
+ version = line[0].split()
+ if topotest.version_cmp(version[2], "4") >= 0:
logger.warning(
"BGP topologies are still using exabgp version 3, expect failures"
)
# Physical Interfaces
if "links" in topo['switches'][curSwitch]:
for destRouterLink, data in sorted(
- topo['switches'][curSwitch]['links'].iteritems()):
+ topo['switches'][curSwitch]['links'].items()):
# Loopback interfaces
if "dst_node" in data:
"""
if log_level is None:
log_level = self.log_level
- if self.loggers.has_key(name):
+ if name in self.loggers:
return self.loggers[name]
nlogger = logging.Logger(name, level=log_level)
# Backward compatibility:
# Load configuration defaults like topogen.
self.config_defaults = configparser.ConfigParser(
- {
+ defaults = {
"verbosity": "info",
"frrdir": "/usr/lib/frr",
"routertype": "frr",
- "memleak_path": None,
+ "memleak_path": "",
}
)
self.config_defaults.read(
for daemon in self.daemons:
if (self.daemons[daemon] == 1) and not (daemon in daemonsRunning):
sys.stderr.write("%s: Daemon %s not running\n" % (self.name, daemon))
- if daemon is "staticd":
+ if daemon == "staticd":
sys.stderr.write(
"You may have a copy of staticd installed but are attempting to test against\n"
)
logger.info("{}: running version: {}".format(self.name, self.version))
rversion = self.version
- if rversion is None:
+ if rversion == None:
return False
result = version_cmp(rversion, version)
os.path.join(CWD, '{}/ospfd.conf'.format(rname))
)
- # What is this? OSPF Unnumbered depends on the rp_filter
- # being set appropriately( HA! )
- # Effectively we are putting different /32's on the interface
- # the multicast packet delivery is somewhat controlled by
- # the rp_filter. Setting it to '0' allows the OS to pass
- # up the mcast packet not destined for the local routers
- # network.
- topotest.set_sysctl(tgen.net['r1'],
- 'net.ipv4.conf.r1-eth1.rp_filter', 0)
+ # What is this? OSPF Unnumbered depends on the rp_filter
+ # being set appropriately( HA! )
+ # Effectively we are putting different /32's on the interface
+ # the multicast packet delivery is somewhat controlled by
+ # the rp_filter. Setting it to '0' allows the OS to pass
+ # up the mcast packet not destined for the local routers
+ # network.
+ topotest.set_sysctl(tgen.net['r1'],
+ 'net.ipv4.conf.r1-eth1.rp_filter', 0)
topotest.set_sysctl(tgen.net['r1'],
'net.ipv4.conf.all.rp_filter', 0)
- topotest.set_sysctl(tgen.net['r2'],
- 'net.ipv4.conf.r2-eth1.rp_filter', 0)
+ topotest.set_sysctl(tgen.net['r2'],
+ 'net.ipv4.conf.r2-eth1.rp_filter', 0)
topotest.set_sysctl(tgen.net['r2'],
'net.ipv4.conf.all.rp_filter', 0)
krel = platform.release()
if topotest.version_cmp(krel, "4.10") < 0:
- tgen.errors = "Newer kernel than 4.9 needed for pbr tests"
- pytest.skip(tgen.errors)
+ tgen.errors = "Newer kernel than 4.9 needed for pbr tests"
+ pytest.skip(tgen.errors)
router_list = tgen.routers()
for rname, router in router_list.items():
test_func = partial(topotest.router_json_cmp, router, "show pbr interface json", expected)
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
assertmsg = '"show pbr interface" mismatches on {}'.format(router.name)
- if result is not None:
- gather_pbr_data_on_error(router)
+ if result is not None:
+ gather_pbr_data_on_error(router)
assert result is None, assertmsg
map_file = "{}/{}/pbr-map.json".format(CWD, router.name)
test_func = partial(topotest.router_json_cmp, router, "show pbr map json", expected)
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
assertmsg = '"show pbr map" mismatches on {}'.format(router.name)
- if result is not None:
- gather_pbr_data_on_error(router)
+ if result is not None:
+ gather_pbr_data_on_error(router)
assert result is None, assertmsg
nexthop_file = "{}/{}/pbr-nexthop-groups.json".format(CWD, router.name)
test_func = partial(topotest.router_json_cmp, router, "show pbr nexthop-groups json", expected)
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
assertmsg = '"show pbr nexthop-groups" mismatches on {}'.format(router.name)
- if result is not None:
- gather_pbr_data_on_error(router)
+ if result is not None:
+ gather_pbr_data_on_error(router)
assert result is None, assertmsg
def test_pbr_flap():
test_func = partial(topotest.router_json_cmp, router, "show pbr interface json", expected)
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
assertmsg = '"show pbr interface" mismatches on {}'.format(router.name)
- if result is not None:
- gather_pbr_data_on_error(router)
+ if result is not None:
+ gather_pbr_data_on_error(router)
assert result is None, assertmsg
switch[4] = self.addSwitch("sw4", cls=topotest.LegacySwitch)
self.addLink(switch[4], router[3], intfName2="r3-eth0")
- switch[5] = self.addSwitch("sw5", cls=topotest.LegacySwitch)
- self.addLink(switch[5], router[1], intfName2="r1-eth2")
+ switch[5] = self.addSwitch("sw5", cls=topotest.LegacySwitch)
+ self.addLink(switch[5], router[1], intfName2="r1-eth2")
- switch[6] = self.addSwitch("sw6", cls=topotest.LegacySwitch)
- self.addLink(switch[6], router[1], intfName2="r1-eth3")
+ switch[6] = self.addSwitch("sw6", cls=topotest.LegacySwitch)
+ self.addLink(switch[6], router[1], intfName2="r1-eth3")
#####################################################