diff options
Diffstat (limited to 'tests/topotests/all-protocol-startup/test_all_protocol_startup.py')
| -rw-r--r-- | tests/topotests/all-protocol-startup/test_all_protocol_startup.py | 773 |
1 files changed, 488 insertions, 285 deletions
diff --git a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py index 4b57928366..0254ff6af6 100644 --- a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py +++ b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py @@ -55,6 +55,7 @@ fatal_error = "" ## ##################################################### + class NetworkTopo(Topo): "All Protocol Startup Test" @@ -64,15 +65,15 @@ class NetworkTopo(Topo): router = {} # # Setup Main Router - router[1] = topotest.addRouter(self, 'r1') + router[1] = topotest.addRouter(self, "r1") # # Setup Switches switch = {} # for i in range(0, 10): - switch[i] = self.addSwitch('sw%s' % i, cls=topotest.LegacySwitch) - self.addLink(switch[i], router[1], intfName2='r1-eth%s' % i ) + switch[i] = self.addSwitch("sw%s" % i, cls=topotest.LegacySwitch) + self.addLink(switch[i], router[1], intfName2="r1-eth%s" % i) ##################################################### @@ -81,6 +82,7 @@ class NetworkTopo(Topo): ## ##################################################### + def setup_module(module): global topo, net global fatal_error @@ -89,8 +91,8 @@ def setup_module(module): print("******************************************\n") print("Cleanup old Mininet runs") - os.system('sudo mn -c > /dev/null 2>&1') - os.system('sudo rm /tmp/r* > /dev/null 2>&1') + os.system("sudo mn -c > /dev/null 2>&1") + os.system("sudo rm /tmp/r* > /dev/null 2>&1") thisDir = os.path.dirname(os.path.realpath(__file__)) topo = NetworkTopo() @@ -98,33 +100,35 @@ def setup_module(module): net = Mininet(controller=None, topo=topo) net.start() - if net['r1'].get_routertype() != 'frr': + if net["r1"].get_routertype() != "frr": fatal_error = "Test is only implemented for FRR" - sys.stderr.write('\n\nTest is only implemented for FRR - Skipping\n\n') + sys.stderr.write("\n\nTest is only implemented for FRR - Skipping\n\n") pytest.skip(fatal_error) - + # Starting Routers # # Main router for i in range(1, 2): - net['r%s' % i].loadConf('zebra', '%s/r%s/zebra.conf' % (thisDir, i)) - net['r%s' % i].loadConf('ripd', '%s/r%s/ripd.conf' % (thisDir, i)) - net['r%s' % i].loadConf('ripngd', '%s/r%s/ripngd.conf' % (thisDir, i)) - net['r%s' % i].loadConf('ospfd', '%s/r%s/ospfd.conf' % (thisDir, i)) - if net['r1'].checkRouterVersion('<', '4.0'): - net['r%s' % i].loadConf('ospf6d', '%s/r%s/ospf6d.conf-pre-v4' % (thisDir, i)) + net["r%s" % i].loadConf("zebra", "%s/r%s/zebra.conf" % (thisDir, i)) + net["r%s" % i].loadConf("ripd", "%s/r%s/ripd.conf" % (thisDir, i)) + net["r%s" % i].loadConf("ripngd", "%s/r%s/ripngd.conf" % (thisDir, i)) + net["r%s" % i].loadConf("ospfd", "%s/r%s/ospfd.conf" % (thisDir, i)) + if net["r1"].checkRouterVersion("<", "4.0"): + net["r%s" % i].loadConf( + "ospf6d", "%s/r%s/ospf6d.conf-pre-v4" % (thisDir, i) + ) else: - net['r%s' % i].loadConf('ospf6d', '%s/r%s/ospf6d.conf' % (thisDir, i)) - net['r%s' % i].loadConf('isisd', '%s/r%s/isisd.conf' % (thisDir, i)) - net['r%s' % i].loadConf('bgpd', '%s/r%s/bgpd.conf' % (thisDir, i)) - if net['r%s' % i].daemon_available('ldpd'): + net["r%s" % i].loadConf("ospf6d", "%s/r%s/ospf6d.conf" % (thisDir, i)) + net["r%s" % i].loadConf("isisd", "%s/r%s/isisd.conf" % (thisDir, i)) + net["r%s" % i].loadConf("bgpd", "%s/r%s/bgpd.conf" % (thisDir, i)) + if net["r%s" % i].daemon_available("ldpd"): # Only test LDPd if it's installed and Kernel >= 4.5 - net['r%s' % i].loadConf('ldpd', '%s/r%s/ldpd.conf' % (thisDir, i)) - net['r%s' % i].loadConf('sharpd') - net['r%s' % i].loadConf('nhrpd', '%s/r%s/nhrpd.conf' % (thisDir, i)) - net['r%s' % i].loadConf('babeld', '%s/r%s/babeld.conf' % (thisDir, i)) - net['r%s' % i].loadConf('pbrd', '%s/r%s/pbrd.conf' % (thisDir, i)) - net['r%s' % i].startRouter() + net["r%s" % i].loadConf("ldpd", "%s/r%s/ldpd.conf" % (thisDir, i)) + net["r%s" % i].loadConf("sharpd") + net["r%s" % i].loadConf("nhrpd", "%s/r%s/nhrpd.conf" % (thisDir, i)) + net["r%s" % i].loadConf("babeld", "%s/r%s/babeld.conf" % (thisDir, i)) + net["r%s" % i].loadConf("pbrd", "%s/r%s/pbrd.conf" % (thisDir, i)) + net["r%s" % i].startRouter() # For debugging after starting FRR daemons, uncomment the next line # CLI(net) @@ -145,7 +149,7 @@ def test_router_running(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Check if FRR is running on each Router node") @@ -154,7 +158,7 @@ def test_router_running(): # Starting Routers for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -166,7 +170,7 @@ def test_error_messages_vtysh(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Check for error messages on VTYSH") @@ -179,38 +183,38 @@ def test_error_messages_vtysh(): # # VTYSH output from router - vtystdout = net['r%s' % i].cmd('vtysh -c "show version" 2> /dev/null').rstrip() + vtystdout = net["r%s" % i].cmd('vtysh -c "show version" 2> /dev/null').rstrip() # Fix newlines (make them all the same) - vtystdout = ('\n'.join(vtystdout.splitlines()) + '\n').rstrip() + vtystdout = ("\n".join(vtystdout.splitlines()) + "\n").rstrip() # Drop everything starting with "FRRouting X.xx" message vtystdout = re.sub(r"FRRouting [0-9]+.*", "", vtystdout, flags=re.DOTALL) - if (vtystdout == ''): + if vtystdout == "": print("r%s StdOut ok" % i) - assert vtystdout == '', "Vtysh StdOut Output check failed for router r%s" % i + assert vtystdout == "", "Vtysh StdOut Output check failed for router r%s" % i # # Second checking Standard Error # # VTYSH StdErr output from router - vtystderr = net['r%s' % i].cmd('vtysh -c "show version" > /dev/null').rstrip() + vtystderr = net["r%s" % i].cmd('vtysh -c "show version" > /dev/null').rstrip() # Fix newlines (make them all the same) - vtystderr = ('\n'.join(vtystderr.splitlines()) + '\n').rstrip() + vtystderr = ("\n".join(vtystderr.splitlines()) + "\n").rstrip() # # Drop everything starting with "FRRouting X.xx" message - # vtystderr = re.sub(r"FRRouting [0-9]+.*", "", vtystderr, flags=re.DOTALL) + # vtystderr = re.sub(r"FRRouting [0-9]+.*", "", vtystderr, flags=re.DOTALL) - if (vtystderr == ''): + if vtystderr == "": print("r%s StdErr ok" % i) - assert vtystderr == '', "Vtysh StdErr Output check failed for router r%s" % i + assert vtystderr == "", "Vtysh StdErr Output check failed for router r%s" % i # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -222,7 +226,7 @@ def test_error_messages_daemons(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Check for error messages in daemons") @@ -231,67 +235,73 @@ def test_error_messages_daemons(): error_logs = "" for i in range(1, 2): - log = net['r%s' % i].getStdErr('ripd') + log = net["r%s" % i].getStdErr("ripd") if log: error_logs += "r%s RIPd StdErr Output:\n" % i error_logs += log - log = net['r%s' % i].getStdErr('ripngd') + log = net["r%s" % i].getStdErr("ripngd") if log: error_logs += "r%s RIPngd StdErr Output:\n" % i error_logs += log - log = net['r%s' % i].getStdErr('ospfd') + log = net["r%s" % i].getStdErr("ospfd") if log: error_logs += "r%s OSPFd StdErr Output:\n" % i error_logs += log - log = net['r%s' % i].getStdErr('ospf6d') + log = net["r%s" % i].getStdErr("ospf6d") if log: error_logs += "r%s OSPF6d StdErr Output:\n" % i error_logs += log - log = net['r%s' % i].getStdErr('isisd') + log = net["r%s" % i].getStdErr("isisd") # ISIS shows debugging enabled status on StdErr # Remove these messages log = re.sub(r"^IS-IS .* debugging is on.*", "", log).rstrip() if log: error_logs += "r%s ISISd StdErr Output:\n" % i error_logs += log - log = net['r%s' % i].getStdErr('bgpd') + log = net["r%s" % i].getStdErr("bgpd") if log: error_logs += "r%s BGPd StdErr Output:\n" % i error_logs += log - if (net['r%s' % i].daemon_available('ldpd')): - log = net['r%s' % i].getStdErr('ldpd') + if net["r%s" % i].daemon_available("ldpd"): + log = net["r%s" % i].getStdErr("ldpd") if log: error_logs += "r%s LDPd StdErr Output:\n" % i error_logs += log - log = net['r1'].getStdErr('nhrpd') + log = net["r1"].getStdErr("nhrpd") if log: error_logs += "r%s NHRPd StdErr Output:\n" % i error_logs += log - log = net['r1'].getStdErr('babeld') + log = net["r1"].getStdErr("babeld") if log: error_logs += "r%s BABELd StdErr Output:\n" % i error_logs += log - log = net['r1'].getStdErr('pbrd') + log = net["r1"].getStdErr("pbrd") if log: error_logs += "r%s PBRd StdErr Output:\n" % i error_logs += log - log = net['r%s' % i].getStdErr('zebra') + log = net["r%s" % i].getStdErr("zebra") if log: error_logs += "r%s Zebra StdErr Output:\n" error_logs += log if error_logs: - sys.stderr.write('Failed check for StdErr Output on daemons:\n%s\n' % error_logs) + sys.stderr.write( + "Failed check for StdErr Output on daemons:\n%s\n" % error_logs + ) # Ignoring the issue if told to ignore (ie not yet fixed) - if (error_logs != ""): - if (os.environ.get('bamboo_TOPOTESTS_ISSUE_349') == "IGNORE"): - sys.stderr.write('Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/349\n') - pytest.skip('Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/349') + if error_logs != "": + if os.environ.get("bamboo_TOPOTESTS_ISSUE_349") == "IGNORE": + sys.stderr.write( + "Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/349\n" + ) + pytest.skip( + "Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/349" + ) assert error_logs == "", "Daemons report errors to StdErr" @@ -304,7 +314,7 @@ def test_converge_protocols(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -318,62 +328,84 @@ def test_converge_protocols(): # Make sure that all daemons are running failures = 0 for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error - print("Show that v4 routes are right\n"); - v4_routesFile = '%s/r%s/ipv4_routes.ref' % (thisDir, i) + print("Show that v4 routes are right\n") + v4_routesFile = "%s/r%s/ipv4_routes.ref" % (thisDir, i) expected = open(v4_routesFile).read().rstrip() - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) - - actual = net['r%s' %i].cmd('vtysh -c "show ip route" | /usr/bin/tail -n +7 | env LC_ALL=en_US.UTF-8 sort 2> /dev/null').rstrip() + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) + + actual = ( + net["r%s" % i] + .cmd( + 'vtysh -c "show ip route" | /usr/bin/tail -n +7 | env LC_ALL=en_US.UTF-8 sort 2> /dev/null' + ) + .rstrip() + ) # Drop time in last update actual = re.sub(r" [0-2][0-9]:[0-5][0-9]:[0-5][0-9]", " XX:XX:XX", actual) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) - diff = topotest.get_textdiff(actual, expected, - title1="Actual IP Routing Table", - title2="Expected IP RoutingTable") + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) + diff = topotest.get_textdiff( + actual, + expected, + title1="Actual IP Routing Table", + title2="Expected IP RoutingTable", + ) if diff: - sys.stderr.write('r%s failed IP Routing table check:\n%s\n' % (i, diff)) + sys.stderr.write("r%s failed IP Routing table check:\n%s\n" % (i, diff)) failures += 1 else: - print("r%s ok" %i) + print("r%s ok" % i) assert failures == 0, "IP Routing table failed for r%s\n%s" % (i, diff) failures = 0 print("Show that v6 routes are right\n") - v6_routesFile = '%s/r%s/ipv6_routes.ref' % (thisDir, i) + v6_routesFile = "%s/r%s/ipv6_routes.ref" % (thisDir, i) expected = open(v6_routesFile).read().rstrip() - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) - - actual = net['r%s' %i].cmd('vtysh -c "show ipv6 route" | /usr/bin/tail -n +7 | env LC_ALL=en_US.UTF-8 sort 2> /dev/null').rstrip() + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) + + actual = ( + net["r%s" % i] + .cmd( + 'vtysh -c "show ipv6 route" | /usr/bin/tail -n +7 | env LC_ALL=en_US.UTF-8 sort 2> /dev/null' + ) + .rstrip() + ) # Drop time in last update actual = re.sub(r" [0-2][0-9]:[0-5][0-9]:[0-5][0-9]", " XX:XX:XX", actual) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) - diff = topotest.get_textdiff(actual, expected, - title1="Actual IPv6 Routing Table", - title2="Expected IPv6 RoutingTable") + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) + diff = topotest.get_textdiff( + actual, + expected, + title1="Actual IPv6 Routing Table", + title2="Expected IPv6 RoutingTable", + ) if diff: - sys.stderr.write('r%s failed IPv6 Routing table check:\n%s\n' % (i, diff)) + sys.stderr.write("r%s failed IPv6 Routing table check:\n%s\n" % (i, diff)) failures += 1 else: - print("r%s ok" %i) + print("r%s ok" % i) assert failures == 0, "IPv6 Routing table failed for r%s\n%s" % (i, diff) # For debugging after starting FRR daemons, uncomment the next line ## CLI(net) + def route_get_nhg_id(route_str): output = net["r1"].cmd('vtysh -c "show ip route %s nexthop-group"' % route_str) match = re.search(r"Nexthop Group ID: (\d+)", output) - assert match is not None, "Nexthop Group ID not found for sharpd route %s" % route_str + assert match is not None, ( + "Nexthop Group ID not found for sharpd route %s" % route_str + ) nhg_id = int(match.group(1)) return nhg_id + def verify_nexthop_group(nhg_id, recursive=False, ecmp=0): # Verify NHG is valid/installed output = net["r1"].cmd('vtysh -c "show nexthop-group rib %d"' % nhg_id) @@ -389,10 +421,14 @@ def verify_nexthop_group(nhg_id, recursive=False, ecmp=0): depends = re.findall(r"\((\d+)\)", match.group(0)) if ecmp: - assert (len(depends) == ecmp), "Nexthop Group ID=%d doesn't match ecmp size" % nhg_id + assert len(depends) == ecmp, ( + "Nexthop Group ID=%d doesn't match ecmp size" % nhg_id + ) else: # If recursive, we need to look at its resolved group - assert (len(depends) == 1), "Nexthop Group ID=%d should only have one recursive depend" % nhg_id + assert len(depends) == 1, ( + "Nexthop Group ID=%d should only have one recursive depend" % nhg_id + ) resolved_id = int(depends[0]) verify_nexthop_group(resolved_id, False) @@ -400,17 +436,19 @@ def verify_nexthop_group(nhg_id, recursive=False, ecmp=0): match = re.search(r"Installed", output) assert match is not None, "Nexthop Group ID=%d not marked Installed" % nhg_id + def verify_route_nexthop_group(route_str, recursive=False, ecmp=0): # Verify route and that zebra created NHGs for and they are valid/installed nhg_id = route_get_nhg_id(route_str) verify_nexthop_group(nhg_id, recursive, ecmp) + def test_nexthop_groups(): global fatal_error global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Verifying Nexthop Groups") @@ -421,7 +459,9 @@ def test_nexthop_groups(): ## Basic test # Create a lib nexthop-group - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group basic" -c "nexthop 1.1.1.1" -c "nexthop 1.1.1.2"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group basic" -c "nexthop 1.1.1.1" -c "nexthop 1.1.1.2"' + ) # Create with sharpd using nexthop-group net["r1"].cmd('vtysh -c "sharp install routes 2.2.2.1 nexthop-group basic 1"') @@ -430,7 +470,9 @@ def test_nexthop_groups(): ## Connected - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group connected" -c "nexthop r1-eth1" -c "nexthop r1-eth2"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group connected" -c "nexthop r1-eth1" -c "nexthop r1-eth2"' + ) net["r1"].cmd('vtysh -c "sharp install routes 2.2.2.2 nexthop-group connected 1"') @@ -438,15 +480,21 @@ def test_nexthop_groups(): ## Recursive - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group basic-recursive" -c "nexthop 2.2.2.1"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group basic-recursive" -c "nexthop 2.2.2.1"' + ) - net["r1"].cmd('vtysh -c "sharp install routes 3.3.3.1 nexthop-group basic-recursive 1"') + net["r1"].cmd( + 'vtysh -c "sharp install routes 3.3.3.1 nexthop-group basic-recursive 1"' + ) verify_route_nexthop_group("3.3.3.1/32", True) ## Duplicate - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group duplicate" -c "nexthop 2.2.2.1" -c "nexthop 1.1.1.1"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group duplicate" -c "nexthop 2.2.2.1" -c "nexthop 1.1.1.1"' + ) net["r1"].cmd('vtysh -c "sharp install routes 3.3.3.2 nexthop-group duplicate 1"') @@ -454,15 +502,19 @@ def test_nexthop_groups(): ## Two 4-Way ECMP - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group fourA" -c "nexthop 1.1.1.1" -c "nexthop 1.1.1.2" \ - -c "nexthop 1.1.1.3" -c "nexthop 1.1.1.4"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group fourA" -c "nexthop 1.1.1.1" -c "nexthop 1.1.1.2" \ + -c "nexthop 1.1.1.3" -c "nexthop 1.1.1.4"' + ) net["r1"].cmd('vtysh -c "sharp install routes 4.4.4.1 nexthop-group fourA 1"') verify_route_nexthop_group("4.4.4.1/32") - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group fourB" -c "nexthop 1.1.1.5" -c "nexthop 1.1.1.6" \ - -c "nexthop 1.1.1.7" -c "nexthop 1.1.1.8"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group fourB" -c "nexthop 1.1.1.5" -c "nexthop 1.1.1.6" \ + -c "nexthop 1.1.1.7" -c "nexthop 1.1.1.8"' + ) net["r1"].cmd('vtysh -c "sharp install routes 4.4.4.2 nexthop-group fourB 1"') @@ -470,9 +522,13 @@ def test_nexthop_groups(): ## Recursive to 8-Way ECMP - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group eight-recursive" -c "nexthop 4.4.4.1" -c "nexthop 4.4.4.2"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group eight-recursive" -c "nexthop 4.4.4.1" -c "nexthop 4.4.4.2"' + ) - net["r1"].cmd('vtysh -c "sharp install routes 5.5.5.1 nexthop-group eight-recursive 1"') + net["r1"].cmd( + 'vtysh -c "sharp install routes 5.5.5.1 nexthop-group eight-recursive 1"' + ) verify_route_nexthop_group("5.5.5.1/32") @@ -488,12 +544,13 @@ def test_nexthop_groups(): net["r1"].cmd('vtysh -c "sharp remove routes 4.4.4.2 1"') net["r1"].cmd('vtysh -c "sharp remove routes 5.5.5.1 1"') + def test_rip_status(): global fatal_error global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -502,30 +559,37 @@ def test_rip_status(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/rip_status.ref' % (thisDir, i) + refTableFile = "%s/r%s/rip_status.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show ip rip status" 2> /dev/null').rstrip() - # Drop time in next due + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show ip rip status" 2> /dev/null') + .rstrip() + ) + # Drop time in next due actual = re.sub(r"in [0-9]+ seconds", "in XX seconds", actual) # Drop time in last update actual = re.sub(r" [0-2][0-9]:[0-5][0-9]:[0-5][0-9]", " XX:XX:XX", actual) # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual IP RIP status", - title2="expected IP RIP status") + title2="expected IP RIP status", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed IP RIP status check:\n%s\n' % (i, diff)) + sys.stderr.write("r%s failed IP RIP status check:\n%s\n" % (i, diff)) failures += 1 else: print("r%s ok" % i) @@ -534,7 +598,7 @@ def test_rip_status(): # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -546,7 +610,7 @@ def test_ripng_status(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -555,41 +619,53 @@ def test_ripng_status(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/ripng_status.ref' % (thisDir, i) + refTableFile = "%s/r%s/ripng_status.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show ipv6 ripng status" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show ipv6 ripng status" 2> /dev/null') + .rstrip() + ) # Mask out Link-Local mac address portion. They are random... actual = re.sub(r" fe80::[0-9a-f:]+", " fe80::XXXX:XXXX:XXXX:XXXX", actual) - # Drop time in next due + # Drop time in next due actual = re.sub(r"in [0-9]+ seconds", "in XX seconds", actual) # Drop time in last update actual = re.sub(r" [0-2][0-9]:[0-5][0-9]:[0-5][0-9]", " XX:XX:XX", actual) # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual IPv6 RIPng status", - title2="expected IPv6 RIPng status") + title2="expected IPv6 RIPng status", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed IPv6 RIPng status check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed IPv6 RIPng status check:\n%s\n" % (i, diff) + ) failures += 1 else: print("r%s ok" % i) - assert failures == 0, "IPv6 RIPng status failed for router r%s:\n%s" % (i, diff) + assert failures == 0, "IPv6 RIPng status failed for router r%s:\n%s" % ( + i, + diff, + ) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -601,7 +677,7 @@ def test_ospfv2_interfaces(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -610,50 +686,71 @@ def test_ospfv2_interfaces(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/show_ip_ospf_interface.ref' % (thisDir, i) + refTableFile = "%s/r%s/show_ip_ospf_interface.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show ip ospf interface" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show ip ospf interface" 2> /dev/null') + .rstrip() + ) # Mask out Bandwidth portion. They may change.. actual = re.sub(r"BW [0-9]+ Mbit", "BW XX Mbit", actual) actual = re.sub(r"ifindex [0-9]", "ifindex X", actual) - # Drop time in next due + # Drop time in next due actual = re.sub(r"Hello due in [0-9\.]+s", "Hello due in XX.XXXs", actual) - actual = re.sub(r"Hello due in [0-9\.]+ usecs", "Hello due in XX.XXXs", actual) + actual = re.sub( + r"Hello due in [0-9\.]+ usecs", "Hello due in XX.XXXs", actual + ) # Fix 'MTU mismatch detection: enabled' vs 'MTU mismatch detection:enabled' - accept both - actual = re.sub(r"MTU mismatch detection:([a-z]+.*)", r"MTU mismatch detection: \1", actual) + actual = re.sub( + r"MTU mismatch detection:([a-z]+.*)", + r"MTU mismatch detection: \1", + actual, + ) # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual SHOW IP OSPF INTERFACE", - title2="expected SHOW IP OSPF INTERFACE") + title2="expected SHOW IP OSPF INTERFACE", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed SHOW IP OSPF INTERFACE check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed SHOW IP OSPF INTERFACE check:\n%s\n" % (i, diff) + ) failures += 1 else: print("r%s ok" % i) # Ignoring the issue if told to ignore (ie not yet fixed) - if (failures != 0): - if (os.environ.get('bamboo_TOPOTESTS_ISSUE_348') == "IGNORE"): - sys.stderr.write('Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/348\n') - pytest.skip('Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/348') - - assert failures == 0, "SHOW IP OSPF INTERFACE failed for router r%s:\n%s" % (i, diff) + if failures != 0: + if os.environ.get("bamboo_TOPOTESTS_ISSUE_348") == "IGNORE": + sys.stderr.write( + "Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/348\n" + ) + pytest.skip( + "Known issue - IGNORING. See https://github.com/FRRouting/frr/issues/348" + ) + + assert ( + failures == 0 + ), "SHOW IP OSPF INTERFACE failed for router r%s:\n%s" % (i, diff) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -665,7 +762,7 @@ def test_isis_interfaces(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -674,42 +771,52 @@ def test_isis_interfaces(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/show_isis_interface_detail.ref' % (thisDir, i) + refTableFile = "%s/r%s/show_isis_interface_detail.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show isis interface detail" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show isis interface detail" 2> /dev/null') + .rstrip() + ) # Mask out Link-Local mac address portion. They are random... actual = re.sub(r"fe80::[0-9a-f:]+", "fe80::XXXX:XXXX:XXXX:XXXX", actual) # Mask out SNPA mac address portion. They are random... actual = re.sub(r"SNPA: [0-9a-f\.]+", "SNPA: XXXX.XXXX.XXXX", actual) # Mask out Circuit ID number - actual = re.sub(r"Circuit Id: 0x[0-9a-f]+", "Circuit Id: 0xXX", - actual) + actual = re.sub(r"Circuit Id: 0x[0-9a-f]+", "Circuit Id: 0xXX", actual) # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual SHOW ISIS INTERFACE DETAIL", - title2="expected SHOW ISIS OSPF6 INTERFACE DETAIL") + title2="expected SHOW ISIS OSPF6 INTERFACE DETAIL", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed SHOW ISIS INTERFACE DETAIL check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed SHOW ISIS INTERFACE DETAIL check:\n%s\n" % (i, diff) + ) failures += 1 else: print("r%s ok" % i) - assert failures == 0, "SHOW ISIS INTERFACE DETAIL failed for router r%s:\n%s" % (i, diff) + assert ( + failures == 0 + ), "SHOW ISIS INTERFACE DETAIL failed for router r%s:\n%s" % (i, diff) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -721,7 +828,7 @@ def test_bgp_summary(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -730,15 +837,19 @@ def test_bgp_summary(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/show_ip_bgp_summary.ref' % (thisDir, i) + refTableFile = "%s/r%s/show_ip_bgp_summary.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show ip bgp summary" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show ip bgp summary" 2> /dev/null') + .rstrip() + ) # Mask out "using XXiXX bytes" portion. They are random... actual = re.sub(r"using [0-9]+ bytes", "using XXXX bytes", actual) # Mask out "using XiXXX KiB" portion. They are random... @@ -747,50 +858,60 @@ def test_bgp_summary(): # Remove extra summaries which exist with newer versions # # Remove summary lines (changed recently) - actual = re.sub(r'Total number.*', '', actual) - actual = re.sub(r'Displayed.*', '', actual) + actual = re.sub(r"Total number.*", "", actual) + actual = re.sub(r"Displayed.*", "", actual) # Remove IPv4 Unicast Summary (Title only) - actual = re.sub(r'IPv4 Unicast Summary:', '', actual) + actual = re.sub(r"IPv4 Unicast Summary:", "", actual) # Remove IPv4 Multicast Summary (all of it) - actual = re.sub(r'IPv4 Multicast Summary:', '', actual) - actual = re.sub(r'No IPv4 Multicast neighbor is configured', '', actual) + actual = re.sub(r"IPv4 Multicast Summary:", "", actual) + actual = re.sub(r"No IPv4 Multicast neighbor is configured", "", actual) # Remove IPv4 VPN Summary (all of it) - actual = re.sub(r'IPv4 VPN Summary:', '', actual) - actual = re.sub(r'No IPv4 VPN neighbor is configured', '', actual) + actual = re.sub(r"IPv4 VPN Summary:", "", actual) + actual = re.sub(r"No IPv4 VPN neighbor is configured", "", actual) # Remove IPv4 Encap Summary (all of it) - actual = re.sub(r'IPv4 Encap Summary:', '', actual) - actual = re.sub(r'No IPv4 Encap neighbor is configured', '', actual) + actual = re.sub(r"IPv4 Encap Summary:", "", actual) + actual = re.sub(r"No IPv4 Encap neighbor is configured", "", actual) # Remove Unknown Summary (all of it) - actual = re.sub(r'Unknown Summary:', '', actual) - actual = re.sub(r'No Unknown neighbor is configured', '', actual) + actual = re.sub(r"Unknown Summary:", "", actual) + actual = re.sub(r"No Unknown neighbor is configured", "", actual) - actual = re.sub(r'IPv4 labeled-unicast Summary:', '', actual) - actual = re.sub(r'No IPv4 labeled-unicast neighbor is configured', '', actual) + actual = re.sub(r"IPv4 labeled-unicast Summary:", "", actual) + actual = re.sub( + r"No IPv4 labeled-unicast neighbor is configured", "", actual + ) # Strip empty lines actual = actual.lstrip() actual = actual.rstrip() # # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual SHOW IP BGP SUMMARY", - title2="expected SHOW IP BGP SUMMARY") + title2="expected SHOW IP BGP SUMMARY", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed SHOW IP BGP SUMMARY check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed SHOW IP BGP SUMMARY check:\n%s\n" % (i, diff) + ) failures += 1 else: print("r%s ok" % i) - assert failures == 0, "SHOW IP BGP SUMMARY failed for router r%s:\n%s" % (i, diff) + assert failures == 0, "SHOW IP BGP SUMMARY failed for router r%s:\n%s" % ( + i, + diff, + ) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -802,7 +923,7 @@ def test_bgp_ipv6_summary(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -811,15 +932,19 @@ def test_bgp_ipv6_summary(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/show_bgp_ipv6_summary.ref' % (thisDir, i) + refTableFile = "%s/r%s/show_bgp_ipv6_summary.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv6 summary" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show bgp ipv6 summary" 2> /dev/null') + .rstrip() + ) # Mask out "using XXiXX bytes" portion. They are random... actual = re.sub(r"using [0-9]+ bytes", "using XXXX bytes", actual) # Mask out "using XiXXX KiB" portion. They are random... @@ -828,51 +953,61 @@ def test_bgp_ipv6_summary(): # Remove extra summaries which exist with newer versions # # Remove summary lines (changed recently) - actual = re.sub(r'Total number.*', '', actual) - actual = re.sub(r'Displayed.*', '', actual) + actual = re.sub(r"Total number.*", "", actual) + actual = re.sub(r"Displayed.*", "", actual) # Remove IPv4 Unicast Summary (Title only) - actual = re.sub(r'IPv6 Unicast Summary:', '', actual) + actual = re.sub(r"IPv6 Unicast Summary:", "", actual) # Remove IPv4 Multicast Summary (all of it) - actual = re.sub(r'IPv6 Multicast Summary:', '', actual) - actual = re.sub(r'No IPv6 Multicast neighbor is configured', '', actual) + actual = re.sub(r"IPv6 Multicast Summary:", "", actual) + actual = re.sub(r"No IPv6 Multicast neighbor is configured", "", actual) # Remove IPv4 VPN Summary (all of it) - actual = re.sub(r'IPv6 VPN Summary:', '', actual) - actual = re.sub(r'No IPv6 VPN neighbor is configured', '', actual) + actual = re.sub(r"IPv6 VPN Summary:", "", actual) + actual = re.sub(r"No IPv6 VPN neighbor is configured", "", actual) # Remove IPv4 Encap Summary (all of it) - actual = re.sub(r'IPv6 Encap Summary:', '', actual) - actual = re.sub(r'No IPv6 Encap neighbor is configured', '', actual) + actual = re.sub(r"IPv6 Encap Summary:", "", actual) + actual = re.sub(r"No IPv6 Encap neighbor is configured", "", actual) # Remove Unknown Summary (all of it) - actual = re.sub(r'Unknown Summary:', '', actual) - actual = re.sub(r'No Unknown neighbor is configured', '', actual) + actual = re.sub(r"Unknown Summary:", "", actual) + actual = re.sub(r"No Unknown neighbor is configured", "", actual) # Remove Labeled Unicast Summary (all of it) - actual = re.sub(r'IPv6 labeled-unicast Summary:', '', actual) - actual = re.sub(r'No IPv6 labeled-unicast neighbor is configured', '', actual) + actual = re.sub(r"IPv6 labeled-unicast Summary:", "", actual) + actual = re.sub( + r"No IPv6 labeled-unicast neighbor is configured", "", actual + ) # Strip empty lines actual = actual.lstrip() actual = actual.rstrip() # # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual SHOW BGP IPv6 SUMMARY", - title2="expected SHOW BGP IPv6 SUMMARY") + title2="expected SHOW BGP IPv6 SUMMARY", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed SHOW BGP IPv6 SUMMARY check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed SHOW BGP IPv6 SUMMARY check:\n%s\n" % (i, diff) + ) failures += 1 else: print("r%s ok" % i) - assert failures == 0, "SHOW BGP IPv6 SUMMARY failed for router r%s:\n%s" % (i, diff) + assert failures == 0, "SHOW BGP IPv6 SUMMARY failed for router r%s:\n%s" % ( + i, + diff, + ) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -884,7 +1019,7 @@ def test_bgp_ipv4(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -894,27 +1029,31 @@ def test_bgp_ipv4(): diffresult = {} for i in range(1, 2): success = 0 - for refTableFile in (glob.glob( - '%s/r%s/show_bgp_ipv4*.ref' % (thisDir, i))): + for refTableFile in glob.glob("%s/r%s/show_bgp_ipv4*.ref" % (thisDir, i)): if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv4" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i].cmd('vtysh -c "show bgp ipv4" 2> /dev/null').rstrip() + ) # Remove summary line (changed recently) - actual = re.sub(r'Total number.*', '', actual) - actual = re.sub(r'Displayed.*', '', actual) + actual = re.sub(r"Total number.*", "", actual) + actual = re.sub(r"Displayed.*", "", actual) actual = actual.rstrip() # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual SHOW BGP IPv4", - title2="expected SHOW BGP IPv4") + title2="expected SHOW BGP IPv4", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: @@ -925,17 +1064,20 @@ def test_bgp_ipv4(): break if not success: - resultstr = 'No template matched.\n' + resultstr = "No template matched.\n" for f in diffresult.iterkeys(): - resultstr += ( - 'template %s: r%s failed SHOW BGP IPv4 check:\n%s\n' - % (f, i, diffresult[f])) + resultstr += "template %s: r%s failed SHOW BGP IPv4 check:\n%s\n" % ( + f, + i, + diffresult[f], + ) raise AssertionError( - "SHOW BGP IPv4 failed for router r%s:\n%s" % (i, resultstr)) + "SHOW BGP IPv4 failed for router r%s:\n%s" % (i, resultstr) + ) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -947,7 +1089,7 @@ def test_bgp_ipv6(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -957,27 +1099,31 @@ def test_bgp_ipv6(): diffresult = {} for i in range(1, 2): success = 0 - for refTableFile in (glob.glob( - '%s/r%s/show_bgp_ipv6*.ref' % (thisDir, i))): + for refTableFile in glob.glob("%s/r%s/show_bgp_ipv6*.ref" % (thisDir, i)): if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv6" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i].cmd('vtysh -c "show bgp ipv6" 2> /dev/null').rstrip() + ) # Remove summary line (changed recently) - actual = re.sub(r'Total number.*', '', actual) - actual = re.sub(r'Displayed.*', '', actual) + actual = re.sub(r"Total number.*", "", actual) + actual = re.sub(r"Displayed.*", "", actual) actual = actual.rstrip() # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual SHOW BGP IPv6", - title2="expected SHOW BGP IPv6") + title2="expected SHOW BGP IPv6", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: @@ -987,27 +1133,31 @@ def test_bgp_ipv6(): print("template %s matched: r%s ok" % (refTableFile, i)) if not success: - resultstr = 'No template matched.\n' + resultstr = "No template matched.\n" for f in diffresult.iterkeys(): - resultstr += ( - 'template %s: r%s failed SHOW BGP IPv6 check:\n%s\n' - % (f, i, diffresult[f])) + resultstr += "template %s: r%s failed SHOW BGP IPv6 check:\n%s\n" % ( + f, + i, + diffresult[f], + ) raise AssertionError( - "SHOW BGP IPv6 failed for router r%s:\n%s" % (i, resultstr)) + "SHOW BGP IPv6 failed for router r%s:\n%s" % (i, resultstr) + ) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line # CLI(net) + def test_route_map(): global fatal_error global net - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -1016,32 +1166,42 @@ def test_route_map(): print("*******************************************************\n") failures = 0 for i in range(1, 2): - refroutemap = '%s/r%s/show_route_map.ref' % (thisDir, i) + refroutemap = "%s/r%s/show_route_map.ref" % (thisDir, i) if os.path.isfile(refroutemap): expected = open(refroutemap).read().rstrip() - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) - actual = net['r%s' %i].cmd('vtysh -c "show route-map" 2> /dev/null').rstrip() - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ( + net["r%s" % i].cmd('vtysh -c "show route-map" 2> /dev/null').rstrip() + ) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) - diff = topotest.get_textdiff(actual, expected, - title1="actual show route-map", - title2="expected show route-map") + diff = topotest.get_textdiff( + actual, + expected, + title1="actual show route-map", + title2="expected show route-map", + ) if diff: - sys.stderr.write('r%s failed show route-map command Check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed show route-map command Check:\n%s\n" % (i, diff) + ) failures += 1 else: - print("r%s ok" %i) + print("r%s ok" % i) + + assert ( + failures == 0 + ), "Show route-map command failed for router r%s:\n%s" % (i, diff) - assert failures == 0, "Show route-map command failed for router r%s:\n%s" % (i, diff) def test_nexthop_groups_with_route_maps(): global fatal_error global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Verifying Nexthop Groups With Route-Maps") @@ -1050,14 +1210,18 @@ def test_nexthop_groups_with_route_maps(): ### Nexthop Group With Route-Map Tests # Create a lib nexthop-group - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group test" -c "nexthop 1.1.1.1" -c "nexthop 1.1.1.2"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group test" -c "nexthop 1.1.1.1" -c "nexthop 1.1.1.2"' + ) ## Route-Map Proto Source route_str = "2.2.2.1" src_str = "192.168.0.1" - net["r1"].cmd('vtysh -c "c t" -c "route-map NH-SRC permit 111" -c "set src %s"' % src_str) + net["r1"].cmd( + 'vtysh -c "c t" -c "route-map NH-SRC permit 111" -c "set src %s"' % src_str + ) net["r1"].cmd('vtysh -c "c t" -c "ip protocol sharp route-map NH-SRC"') net["r1"].cmd('vtysh -c "sharp install routes %s nexthop-group test 1"' % route_str) @@ -1066,14 +1230,19 @@ def test_nexthop_groups_with_route_maps(): # Only a valid test on linux using nexthop objects if sys.platform.startswith("linux"): - output = net["r1"].cmd('ip route show %s/32' % route_str) + output = net["r1"].cmd("ip route show %s/32" % route_str) match = re.search(r"src %s" % src_str, output) - assert match is not None, "Route %s/32 not installed with src %s" % (route_str, src_str) + assert match is not None, "Route %s/32 not installed with src %s" % ( + route_str, + src_str, + ) # Remove NHG routes and route-map net["r1"].cmd('vtysh -c "sharp remove routes %s 1"' % route_str) net["r1"].cmd('vtysh -c "c t" -c "no ip protocol sharp route-map NH-SRC"') - net["r1"].cmd('vtysh -c "c t" -c "no route-map NH-SRC permit 111" -c "set src %s"' % src_str) + net["r1"].cmd( + 'vtysh -c "c t" -c "no route-map NH-SRC permit 111" -c "set src %s"' % src_str + ) net["r1"].cmd('vtysh -c "c t" -c "no route-map NH-SRC"') ## Route-Map Deny/Permit with same nexthop group @@ -1081,18 +1250,26 @@ def test_nexthop_groups_with_route_maps(): permit_route_str = "3.3.3.1" deny_route_str = "3.3.3.2" - net["r1"].cmd('vtysh -c "c t" -c "ip prefix-list NOPE seq 5 permit %s/32"' % permit_route_str) - net["r1"].cmd('vtysh -c "c t" -c "route-map NOPE permit 111" -c "match ip address prefix-list NOPE"') + net["r1"].cmd( + 'vtysh -c "c t" -c "ip prefix-list NOPE seq 5 permit %s/32"' % permit_route_str + ) + net["r1"].cmd( + 'vtysh -c "c t" -c "route-map NOPE permit 111" -c "match ip address prefix-list NOPE"' + ) net["r1"].cmd('vtysh -c "c t" -c "route-map NOPE deny 222"') net["r1"].cmd('vtysh -c "c t" -c "ip protocol sharp route-map NOPE"') # This route should be permitted - net["r1"].cmd('vtysh -c "sharp install routes %s nexthop-group test 1"' % permit_route_str) + net["r1"].cmd( + 'vtysh -c "sharp install routes %s nexthop-group test 1"' % permit_route_str + ) verify_route_nexthop_group("%s/32" % permit_route_str) # This route should be denied - net["r1"].cmd('vtysh -c "sharp install routes %s nexthop-group test 1"' % deny_route_str) + net["r1"].cmd( + 'vtysh -c "sharp install routes %s nexthop-group test 1"' % deny_route_str + ) nhg_id = route_get_nhg_id(deny_route_str) output = net["r1"].cmd('vtysh -c "show nexthop-group rib %d"' % nhg_id) @@ -1110,14 +1287,18 @@ def test_nexthop_groups_with_route_maps(): net["r1"].cmd('vtysh -c "c t" -c "no route-map NOPE permit 111"') net["r1"].cmd('vtysh -c "c t" -c "no route-map NOPE deny 222"') net["r1"].cmd('vtysh -c "c t" -c "no route-map NOPE"') - net["r1"].cmd('vtysh -c "c t" -c "no ip prefix-list NOPE seq 5 permit %s/32"' % permit_route_str) + net["r1"].cmd( + 'vtysh -c "c t" -c "no ip prefix-list NOPE seq 5 permit %s/32"' + % permit_route_str + ) + def test_nexthop_group_replace(): global fatal_error global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Verifying Nexthop Groups") @@ -1127,7 +1308,9 @@ def test_nexthop_group_replace(): ## 2-Way ECMP Directly Connected - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group replace" -c "nexthop 1.1.1.1 r1-eth1 onlink" -c "nexthop 1.1.1.2 r1-eth2 onlink"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group replace" -c "nexthop 1.1.1.1 r1-eth1 onlink" -c "nexthop 1.1.1.2 r1-eth2 onlink"' + ) # Create with sharpd using nexthop-group net["r1"].cmd('vtysh -c "sharp install routes 3.3.3.1 nexthop-group replace 1"') @@ -1135,21 +1318,24 @@ def test_nexthop_group_replace(): verify_route_nexthop_group("3.3.3.1/32") # Change the nexthop group - net["r1"].cmd('vtysh -c "c t" -c "nexthop-group replace" -c "no nexthop 1.1.1.1 r1-eth1 onlink" -c "nexthop 1.1.1.3 r1-eth1 onlink" -c "nexthop 1.1.1.4 r1-eth4 onlink"') + net["r1"].cmd( + 'vtysh -c "c t" -c "nexthop-group replace" -c "no nexthop 1.1.1.1 r1-eth1 onlink" -c "nexthop 1.1.1.3 r1-eth1 onlink" -c "nexthop 1.1.1.4 r1-eth4 onlink"' + ) # Verify it updated. We can just check install and ecmp count here. verify_route_nexthop_group("3.3.3.1/32", False, 3) + def test_mpls_interfaces(): global fatal_error global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) # Skip if no LDP installed or old kernel - if (net['r1'].daemon_available('ldpd') == False): + if net["r1"].daemon_available("ldpd") == False: pytest.skip("No MPLS or kernel < 4.5") thisDir = os.path.dirname(os.path.realpath(__file__)) @@ -1158,40 +1344,51 @@ def test_mpls_interfaces(): print("******************************************\n") failures = 0 for i in range(1, 2): - refTableFile = '%s/r%s/show_mpls_ldp_interface.ref' % (thisDir, i) + refTableFile = "%s/r%s/show_mpls_ldp_interface.ref" % (thisDir, i) if os.path.isfile(refTableFile): # Read expected result from file expected = open(refTableFile).read().rstrip() # Fix newlines (make them all the same) - expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1) + expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) # Actual output from router - actual = net['r%s' % i].cmd('vtysh -c "show mpls ldp interface" 2> /dev/null').rstrip() + actual = ( + net["r%s" % i] + .cmd('vtysh -c "show mpls ldp interface" 2> /dev/null') + .rstrip() + ) # Mask out Timer in Uptime actual = re.sub(r" [0-9][0-9]:[0-9][0-9]:[0-9][0-9] ", " xx:xx:xx ", actual) # Fix newlines (make them all the same) - actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1) + actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) # Generate Diff - diff = topotest.get_textdiff(actual, expected, + diff = topotest.get_textdiff( + actual, + expected, title1="actual MPLS LDP interface status", - title2="expected MPLS LDP interface status") + title2="expected MPLS LDP interface status", + ) # Empty string if it matches, otherwise diff contains unified diff if diff: - sys.stderr.write('r%s failed MPLS LDP Interface status Check:\n%s\n' % (i, diff)) + sys.stderr.write( + "r%s failed MPLS LDP Interface status Check:\n%s\n" % (i, diff) + ) failures += 1 else: print("r%s ok" % i) - if failures>0: + if failures > 0: fatal_error = "MPLS LDP Interface status failed" - assert failures == 0, "MPLS LDP Interface status failed for router r%s:\n%s" % (i, diff) + assert ( + failures == 0 + ), "MPLS LDP Interface status failed for router r%s:\n%s" % (i, diff) # Make sure that all daemons are running for i in range(1, 2): - fatal_error = net['r%s' % i].checkRouterRunning() + fatal_error = net["r%s" % i].checkRouterRunning() assert fatal_error == "", fatal_error # For debugging after starting FRR daemons, uncomment the next line @@ -1203,58 +1400,60 @@ def test_shutdown_check_stderr(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) print("\n\n** Verifying unexpected STDERR output from daemons") print("******************************************\n") - if os.environ.get('TOPOTESTS_CHECK_STDERR') is None: - print("SKIPPED final check on StdErr output: Disabled (TOPOTESTS_CHECK_STDERR undefined)\n") - pytest.skip('Skipping test for Stderr output') + if os.environ.get("TOPOTESTS_CHECK_STDERR") is None: + print( + "SKIPPED final check on StdErr output: Disabled (TOPOTESTS_CHECK_STDERR undefined)\n" + ) + pytest.skip("Skipping test for Stderr output") thisDir = os.path.dirname(os.path.realpath(__file__)) print("thisDir=" + thisDir) - net['r1'].stopRouter() + net["r1"].stopRouter() - log = net['r1'].getStdErr('ripd') + log = net["r1"].getStdErr("ripd") if log: print("\nRIPd StdErr Log:\n" + log) - log = net['r1'].getStdErr('ripngd') + log = net["r1"].getStdErr("ripngd") if log: print("\nRIPngd StdErr Log:\n" + log) - log = net['r1'].getStdErr('ospfd') + log = net["r1"].getStdErr("ospfd") if log: print("\nOSPFd StdErr Log:\n" + log) - log = net['r1'].getStdErr('ospf6d') + log = net["r1"].getStdErr("ospf6d") if log: print("\nOSPF6d StdErr Log:\n" + log) - log = net['r1'].getStdErr('isisd') + log = net["r1"].getStdErr("isisd") if log: print("\nISISd StdErr Log:\n" + log) - log = net['r1'].getStdErr('bgpd') + log = net["r1"].getStdErr("bgpd") if log: print("\nBGPd StdErr Log:\n" + log) - log = net['r1'].getStdErr('nhrpd') + log = net["r1"].getStdErr("nhrpd") if log: print("\nNHRPd StdErr Log:\n" + log) - log = net['r1'].getStdErr('pbrd') + log = net["r1"].getStdErr("pbrd") if log: print("\nPBRd StdErr Log:\n" + log) - log = net['r1'].getStdErr('babeld') + log = net["r1"].getStdErr("babeld") if log: print("\nBABELd StdErr Log:\n" + log) - if (net['r1'].daemon_available('ldpd')): - log = net['r1'].getStdErr('ldpd') + if net["r1"].daemon_available("ldpd"): + log = net["r1"].getStdErr("ldpd") if log: print("\nLDPd StdErr Log:\n" + log) - log = net['r1'].getStdErr('zebra') + log = net["r1"].getStdErr("zebra") if log: print("\nZebra StdErr Log:\n" + log) @@ -1264,23 +1463,27 @@ def test_shutdown_check_memleak(): global net # Skip if previous fatal error condition is raised - if (fatal_error != ""): + if fatal_error != "": pytest.skip(fatal_error) - if os.environ.get('TOPOTESTS_CHECK_MEMLEAK') is None: - print("SKIPPED final check on Memory leaks: Disabled (TOPOTESTS_CHECK_MEMLEAK undefined)\n") - pytest.skip('Skipping test for memory leaks') - + if os.environ.get("TOPOTESTS_CHECK_MEMLEAK") is None: + print( + "SKIPPED final check on Memory leaks: Disabled (TOPOTESTS_CHECK_MEMLEAK undefined)\n" + ) + pytest.skip("Skipping test for memory leaks") + thisDir = os.path.dirname(os.path.realpath(__file__)) for i in range(1, 2): - net['r%s' % i].stopRouter() - net['r%s' % i].report_memory_leaks(os.environ.get('TOPOTESTS_CHECK_MEMLEAK'), os.path.basename(__file__)) + net["r%s" % i].stopRouter() + net["r%s" % i].report_memory_leaks( + os.environ.get("TOPOTESTS_CHECK_MEMLEAK"), os.path.basename(__file__) + ) -if __name__ == '__main__': +if __name__ == "__main__": - setLogLevel('info') + setLogLevel("info") # To suppress tracebacks, either use the following pytest call or add "--tb=no" to cli # retval = pytest.main(["-s", "--tb=no"]) retval = pytest.main(["-s"]) |
