summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.git-blame-ignore-revs1
-rw-r--r--tests/topotests/all-protocol-startup/test_all_protocol_startup.py24
-rw-r--r--tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py3
-rw-r--r--tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py30
-rw-r--r--tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py26
-rw-r--r--tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py26
-rw-r--r--tests/topotests/bgp-evpn-mh/test_evpn_mh.py21
-rwxr-xr-xtests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py6
-rw-r--r--tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py7
-rw-r--r--tests/topotests/bgp-route-map/test_route_map_topo1.py33
-rw-r--r--tests/topotests/bgp-route-map/test_route_map_topo2.py64
-rw-r--r--tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py18
-rw-r--r--tests/topotests/bgp_features/test_bgp_features.py254
-rw-r--r--tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py158
-rw-r--r--tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py31
-rw-r--r--tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py12
-rw-r--r--tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py38
-rw-r--r--tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py6
-rw-r--r--tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py53
-rw-r--r--tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py17
-rw-r--r--tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py2
-rw-r--r--tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py7
-rwxr-xr-xtests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py343
-rwxr-xr-xtests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py378
-rw-r--r--tests/topotests/ldp-topo1/test_ldp_topo1.py8
-rw-r--r--tests/topotests/lib/common_config.py27
-rw-r--r--tests/topotests/lib/ospf.py12
-rwxr-xr-xtests/topotests/lib/test/test_json.py229
-rw-r--r--tests/topotests/lib/topogen.py4
-rw-r--r--tests/topotests/lib/topotest.py75
-rw-r--r--tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py6
-rw-r--r--tests/topotests/ospf-topo1/test_ospf_topo1.py29
-rw-r--r--tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py12
-rw-r--r--tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py609
-rw-r--r--tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py14
-rw-r--r--tests/topotests/ospf_basic_functionality/test_ospf_single_area.py253
-rw-r--r--tests/topotests/rip-topo1/test_rip_topo1.py8
-rw-r--r--tests/topotests/ripng-topo1/test_ripng_topo1.py8
-rw-r--r--tests/topotests/zebra_netlink/test_zebra_netlink.py7
39 files changed, 1753 insertions, 1106 deletions
diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index 61bc0ade65..c3a0fdedf0 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -2,6 +2,7 @@
# git blame --ignore-revs-file .git-blame-ignore-revs <...>
# or to make it permanent
# git config blame.ignoreRevsFile .git-blame-ignore-revs
+9fa6ec14737b94fdfb41539d96c7e4f84f3514b6
701a01920eee5431d2052aad92aefbdf50ac2139
bf2394f08bdc91a6cbd3784a1bfa3af3247bb06f
0157c327715ca367d13b7f02b2981f3484ccdeeb
diff --git a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py
index 08378d9b58..ab9358408e 100644
--- a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py
+++ b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py
@@ -1016,6 +1016,7 @@ def test_bgp_ipv6_summary():
# For debugging after starting FRR daemons, uncomment the next line
# CLI(net)
+
def test_nht():
print("\n\n**** Test that nexthop tracking is at least nominally working ****\n")
@@ -1026,13 +1027,16 @@ def test_nht():
expected = open(nhtFile).read().rstrip()
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
- actual = (net["r%s" %i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip())
+ actual = net["r%s" % i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip()
actual = re.sub(r"fd [0-9][0-9]", "fd XX", actual)
actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1)
- diff = topotest.get_textdiff(actual, expected,
- title1="Actual `show ip nht`",
- title2="Expected `show ip nht`")
+ diff = topotest.get_textdiff(
+ actual,
+ expected,
+ title1="Actual `show ip nht`",
+ title2="Expected `show ip nht`",
+ )
if diff:
assert 0, "r%s failed ip nht check:\n%s\n" % (i, diff)
@@ -1043,19 +1047,23 @@ def test_nht():
expected = open(nhtFile).read().rstrip()
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
- actual = (net["r%s" %i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip())
+ actual = net["r%s" % i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip()
actual = re.sub(r"fd [0-9][0-9]", "fd XX", actual)
actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1)
- diff = topotest.get_textdiff(actual, expected,
- title1="Actual `show ip nht`",
- title2="Expected `show ip nht`")
+ diff = topotest.get_textdiff(
+ actual,
+ expected,
+ title1="Actual `show ip nht`",
+ title2="Expected `show ip nht`",
+ )
if diff:
assert 0, "r%s failed ipv6 nht check:\n%s\n" % (i, diff)
else:
print("show ipv6 nht is ok\n")
+
def test_bgp_ipv4():
global fatal_error
global net
diff --git a/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py b/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py
index 595132214b..98a5033f53 100644
--- a/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py
+++ b/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py
@@ -74,7 +74,8 @@ def setup_module(mod):
for rname, router in router_list.items():
router.load_config(
- TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)),
+ TopoRouter.RD_ZEBRA,
+ os.path.join(CWD, "{}/zebra.conf".format(rname)),
)
router.load_config(
TopoRouter.RD_BFD, os.path.join(CWD, "{}/bfdd.conf".format(rname))
diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
index b3b7256ac4..b701a0d61e 100644
--- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
+++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
@@ -282,10 +282,26 @@ def test_BGP_config_with_invalid_ASN_p2(request):
# Api call to modify AS number
input_dict = {
- "r1": {"bgp": {"local_as": 0,}},
- "r2": {"bgp": {"local_as": 0,}},
- "r3": {"bgp": {"local_as": 0,}},
- "r4": {"bgp": {"local_as": 64000,}},
+ "r1": {
+ "bgp": {
+ "local_as": 0,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 0,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 0,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 64000,
+ }
+ },
}
result = modify_as_number(tgen, topo, input_dict)
try:
@@ -819,7 +835,11 @@ def test_bgp_with_loopback_interface(request):
# Adding ['source_link'] = 'lo' key:value pair
topo["routers"][routerN]["bgp"]["address_family"]["ipv4"]["unicast"][
"neighbor"
- ][bgp_neighbor]["dest_link"] = {"lo": {"source_link": "lo",}}
+ ][bgp_neighbor]["dest_link"] = {
+ "lo": {
+ "source_link": "lo",
+ }
+ }
# Creating configuration from JSON
build_config_from_json(tgen, topo)
diff --git a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py
index 28a1b98eb3..353df0684b 100644
--- a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py
+++ b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py
@@ -273,8 +273,20 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):
"r3": {
"bgp": {
"address_family": {
- "ipv4": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}},
- "ipv6": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}},
+ "ipv4": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": ecmp_num,
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": ecmp_num,
+ }
+ }
+ },
}
}
}
@@ -303,7 +315,7 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):
input_dict_1,
next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
protocol=protocol,
- count_only=True
+ count_only=True,
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
@@ -371,8 +383,8 @@ def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):
def test_ecmp_remove_redistribute_static(request):
- """ Verify routes are cleared from BGP and RIB table of DUT when
- redistribute static configuration is removed."""
+ """Verify routes are cleared from BGP and RIB table of DUT when
+ redistribute static configuration is removed."""
tc_name = request.node.name
write_test_header(tc_name)
@@ -481,8 +493,8 @@ def test_ecmp_remove_redistribute_static(request):
@pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
def test_ecmp_shut_bgp_neighbor(request, test_type):
- """ Shut BGP neigbors one by one and verify BGP and routing table updated
- accordingly in DUT """
+ """Shut BGP neigbors one by one and verify BGP and routing table updated
+ accordingly in DUT"""
tc_name = request.node.name
write_test_header(tc_name)
diff --git a/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py
index 4d8003c4be..2f73bdb1b8 100644
--- a/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py
+++ b/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py
@@ -274,8 +274,20 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):
"r3": {
"bgp": {
"address_family": {
- "ipv4": {"unicast": {"maximum_paths": {"ibgp": ecmp_num,}}},
- "ipv6": {"unicast": {"maximum_paths": {"ibgp": ecmp_num,}}},
+ "ipv4": {
+ "unicast": {
+ "maximum_paths": {
+ "ibgp": ecmp_num,
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "maximum_paths": {
+ "ibgp": ecmp_num,
+ }
+ }
+ },
}
}
}
@@ -304,7 +316,7 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):
input_dict_1,
next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
protocol=protocol,
- count_only=True
+ count_only=True,
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
@@ -372,8 +384,8 @@ def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):
def test_ecmp_remove_redistribute_static(request):
- """ Verify routes are cleared from BGP and RIB table of DUT when
- redistribute static configuration is removed."""
+ """Verify routes are cleared from BGP and RIB table of DUT when
+ redistribute static configuration is removed."""
tc_name = request.node.name
write_test_header(tc_name)
@@ -482,8 +494,8 @@ def test_ecmp_remove_redistribute_static(request):
@pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
def test_ecmp_shut_bgp_neighbor(request, test_type):
- """ Shut BGP neigbors one by one and verify BGP and routing table updated
- accordingly in DUT """
+ """Shut BGP neigbors one by one and verify BGP and routing table updated
+ accordingly in DUT"""
tc_name = request.node.name
write_test_header(tc_name)
diff --git a/tests/topotests/bgp-evpn-mh/test_evpn_mh.py b/tests/topotests/bgp-evpn-mh/test_evpn_mh.py
index 4c56d1a02d..6a24684649 100644
--- a/tests/topotests/bgp-evpn-mh/test_evpn_mh.py
+++ b/tests/topotests/bgp-evpn-mh/test_evpn_mh.py
@@ -658,10 +658,11 @@ def test_evpn_mac():
assertmsg = '"{}" remote MAC content incorrect'.format(tor.name)
assert result is None, assertmsg
+
def check_df_role(dut, esi, role):
- '''
+ """
Return error string if the df role on the dut is different
- '''
+ """
es_json = dut.vtysh_cmd("show evpn es %s json" % esi)
es = json.loads(es_json)
@@ -676,12 +677,13 @@ def check_df_role(dut, esi, role):
return None
+
def test_evpn_df():
- '''
+ """
1. Check the DF role on all the PEs on rack-1.
2. Increase the DF preference on the non-DF and check if it becomes
the DF winner.
- '''
+ """
tgen = get_topogen()
@@ -720,10 +722,11 @@ def test_evpn_df():
# tgen.mininet_cli()
+
def check_protodown_rc(dut, protodown_rc):
- '''
+ """
check if specified protodown reason code is set
- '''
+ """
out = dut.vtysh_cmd("show evpn json")
@@ -739,13 +742,14 @@ def check_protodown_rc(dut, protodown_rc):
return None
+
def test_evpn_uplink_tracking():
- '''
+ """
1. Wait for access ports to come out of startup-delay
2. disable uplinks and check if access ports have been protodowned
3. enable uplinks and check if access ports have been moved out
of protodown
- '''
+ """
tgen = get_topogen()
@@ -778,6 +782,7 @@ def test_evpn_uplink_tracking():
assertmsg = '"{}" protodown rc incorrect'.format(dut_name)
assert result is None, assertmsg
+
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
diff --git a/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py b/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py
index 5098808d55..9a38158b2b 100755
--- a/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py
+++ b/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py
@@ -310,8 +310,10 @@ def ip_learn_test(tgen, host, local, remote, ip_addr):
assertmsg = "local learned mac wrong type: {} ".format(mac_type)
assert mac_type == "local", assertmsg
- assertmsg = "learned address mismatch with configured address host: {} learned: {}".format(
- ip_addr, learned_ip
+ assertmsg = (
+ "learned address mismatch with configured address host: {} learned: {}".format(
+ ip_addr, learned_ip
+ )
)
assert ip_addr == learned_ip, assertmsg
diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
index 607b036c6a..a9541a55c5 100644
--- a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
+++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
@@ -238,9 +238,10 @@ def test_next_hop_attribute(request):
result = verify_rib(
tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n Error: "
- "{} routes are not present in RIB".format(addr_type, tc_name)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Error: " "{} routes are not present in RIB".format(
+ addr_type, tc_name
)
# Configure next-hop-self to bgp neighbor
diff --git a/tests/topotests/bgp-route-map/test_route_map_topo1.py b/tests/topotests/bgp-route-map/test_route_map_topo1.py
index 1aa951edaa..0158e24d31 100644
--- a/tests/topotests/bgp-route-map/test_route_map_topo1.py
+++ b/tests/topotests/bgp-route-map/test_route_map_topo1.py
@@ -807,7 +807,9 @@ def test_route_map_multiple_seq_different_match_set_clause_p0(request):
"prefix_lists": "pf_list_2_{}".format(addr_type)
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
},
{
"action": "permit",
@@ -906,7 +908,17 @@ def test_route_map_multiple_seq_different_match_set_clause_p0(request):
dut = "r3"
protocol = "bgp"
input_dict = {
- "r3": {"route_maps": {"rmap_match_pf_list1": [{"set": {"metric": 50,}}],}}
+ "r3": {
+ "route_maps": {
+ "rmap_match_pf_list1": [
+ {
+ "set": {
+ "metric": 50,
+ }
+ }
+ ],
+ }
+ }
}
static_routes = [NETWORK[adt][0]]
@@ -1093,7 +1105,14 @@ def test_route_map_set_only_no_match_p0(request):
input_dict_4 = {
"r3": {
"route_maps": {
- "rmap_match_pf_1": [{"action": "permit", "set": {"metric": 50,}}]
+ "rmap_match_pf_1": [
+ {
+ "action": "permit",
+ "set": {
+ "metric": 50,
+ },
+ }
+ ]
}
}
}
@@ -1210,7 +1229,13 @@ def test_route_map_match_only_no_set_p0(request):
"r1": {
"route_maps": {
"rmap_match_pf_1_{}".format(addr_type): [
- {"action": "permit", "set": {"metric": 50, "locPrf": 150,}}
+ {
+ "action": "permit",
+ "set": {
+ "metric": 50,
+ "locPrf": 150,
+ },
+ }
]
}
}
diff --git a/tests/topotests/bgp-route-map/test_route_map_topo2.py b/tests/topotests/bgp-route-map/test_route_map_topo2.py
index 3056aa29f3..958eceba62 100644
--- a/tests/topotests/bgp-route-map/test_route_map_topo2.py
+++ b/tests/topotests/bgp-route-map/test_route_map_topo2.py
@@ -268,12 +268,20 @@ def test_rmap_match_prefix_list_permit_in_and_outbound_prefixes_p0():
"prefix_lists": {
"ipv4": {
"pf_list_1_ipv4": [
- {"seqid": 10, "network": "any", "action": "permit",}
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ }
]
},
"ipv6": {
"pf_list_1_ipv6": [
- {"seqid": 10, "network": "any", "action": "permit",}
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ }
]
},
}
@@ -472,7 +480,11 @@ def test_modify_set_match_clauses_in_rmap_p0():
"prefix_lists": {
"ipv4": {
"pf_list_1_ipv4": [
- {"seqid": 10, "network": "any", "action": "permit",}
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ }
],
"pf_list_2_ipv4": [
{"seqid": 10, "network": "any", "action": "permit"}
@@ -480,7 +492,11 @@ def test_modify_set_match_clauses_in_rmap_p0():
},
"ipv6": {
"pf_list_1_ipv6": [
- {"seqid": 10, "network": "any", "action": "permit",}
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ }
],
"pf_list_2_ipv6": [
{"seqid": 10, "network": "any", "action": "permit"}
@@ -506,7 +522,9 @@ def test_modify_set_match_clauses_in_rmap_p0():
"prefix_lists": "pf_list_1_{}".format(addr_type)
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
}
],
"rmap_match_pf_2_{}".format(addr_type): [
@@ -666,7 +684,9 @@ def test_modify_set_match_clauses_in_rmap_p0():
"prefix_lists": "pf_list_1_{}".format(addr_type)
}
},
- "set": {"locPrf": 1000,},
+ "set": {
+ "locPrf": 1000,
+ },
}
],
"rmap_match_pf_2_{}".format(addr_type): [
@@ -816,12 +836,20 @@ def test_modify_prefix_list_referenced_by_rmap_p0():
"prefix_lists": {
"ipv4": {
"pf_list_1_ipv4": [
- {"seqid": 10, "network": "any", "action": "permit",}
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ }
]
},
"ipv6": {
"pf_list_1_ipv6": [
- {"seqid": 100, "network": "any", "action": "permit",}
+ {
+ "seqid": 100,
+ "network": "any",
+ "action": "permit",
+ }
]
},
}
@@ -1090,7 +1118,9 @@ def test_remove_prefix_list_referenced_by_rmap_p0():
"prefix_lists": "pf_list_1_{}".format(addr_type)
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
}
],
"rmap_match_pf_2_{}".format(addr_type): [
@@ -1894,7 +1924,9 @@ def test_multiple_match_statement_in_route_map_logical_ANDed_p1():
"prefix_lists": "pf_list_1_{}".format(addr_type)
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
}
]
}
@@ -1921,7 +1953,9 @@ def test_multiple_match_statement_in_route_map_logical_ANDed_p1():
}
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
}
]
}
@@ -2048,7 +2082,9 @@ def test_add_remove_rmap_to_specific_neighbor_p0():
"prefix_lists": "pf_list_1_{}".format(addr_type)
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
}
]
}
@@ -3505,7 +3541,9 @@ def test_create_rmap_match_prefix_list_to_deny_in_and_outbound_prefixes_p0():
"prefix_lists": "pf_list_1_{}".format(addr_type)
}
},
- "set": {"locPrf": 150,},
+ "set": {
+ "locPrf": 150,
+ },
}
],
"rmap_match_pf_2_{}".format(addr_type): [
diff --git a/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py b/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py
index 36f1d8cd56..71f64e9b70 100644
--- a/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py
+++ b/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py
@@ -90,7 +90,11 @@ def test_vrf_route_leak():
# Test DONNA VRF.
expect = {
- "10.0.0.0/24": [{"protocol": "connected",}],
+ "10.0.0.0/24": [
+ {
+ "protocol": "connected",
+ }
+ ],
"10.0.1.0/24": [
{"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]}
],
@@ -111,11 +115,19 @@ def test_vrf_route_leak():
"10.0.0.0/24": [
{"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]}
],
- "10.0.1.0/24": [{"protocol": "connected",}],
+ "10.0.1.0/24": [
+ {
+ "protocol": "connected",
+ }
+ ],
"10.0.2.0/24": [
{"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]}
],
- "10.0.3.0/24": [{"protocol": "connected",}],
+ "10.0.3.0/24": [
+ {
+ "protocol": "connected",
+ }
+ ],
}
test_func = partial(
diff --git a/tests/topotests/bgp_features/test_bgp_features.py b/tests/topotests/bgp_features/test_bgp_features.py
index bc821bd7a0..5f3809c2b3 100644
--- a/tests/topotests/bgp_features/test_bgp_features.py
+++ b/tests/topotests/bgp_features/test_bgp_features.py
@@ -753,41 +753,71 @@ def test_bgp_delayopen_without():
pytest.skip(tgen.errors)
# part 1: no delay r1 <=> no delay r4
- logger.info("Starting optional test of BGP functionality without DelayOpenTimer enabled to establish a reference for following tests")
+ logger.info(
+ "Starting optional test of BGP functionality without DelayOpenTimer enabled to establish a reference for following tests"
+ )
# 1.1 enable peering shutdown
logger.info("Enable shutdown of peering between r1 and r4")
- tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"')
- tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"')
+ tgen.net["r1"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"'
+ )
+ tgen.net["r4"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"'
+ )
# 1.2 wait for peers to shut down (poll output)
for router_num in [1, 4]:
- logger.info("Checking BGP summary after enabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after enabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=3, wait=1)
assertmsg = "BGP session on r{} did not shut down peer".format(router_num)
assert res is None, assertmsg
# 1.3 disable peering shutdown
logger.info("Disable shutdown of peering between r1 and r4")
- tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"')
- tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"')
+ tgen.net["r1"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"'
+ )
+ tgen.net["r4"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"'
+ )
# 1.4 wait for peers to establish connection (poll output)
for router_num in [1, 4]:
- logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after disabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=5, wait=1)
- assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num)
+ assertmsg = (
+ "BGP session on r{} did not establish a connection with peer".format(
+ router_num
+ )
+ )
assert res is None, assertmsg
- #tgen.mininet_cli()
+ # tgen.mininet_cli()
# end test_bgp_delayopen_without
@@ -800,65 +830,107 @@ def test_bgp_delayopen_singular():
pytest.skip(tgen.errors)
# part 2: delay 240s r1 <=> no delay r4
- logger.info("Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on one side of the peering")
+ logger.info(
+ "Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on one side of the peering"
+ )
# 2.1 enable peering shutdown
logger.info("Enable shutdown of peering between r1 and r4")
- tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"')
- tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"')
+ tgen.net["r1"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"'
+ )
+ tgen.net["r4"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"'
+ )
# 2.2 wait for peers to shut down (poll output)
for router_num in [1, 4]:
- logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after disabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=3, wait=1)
assertmsg = "BGP session on r{} did not shut down peer".format(router_num)
assert res is None, assertmsg
# 2.3 set delayopen on R1 to 240
logger.info("Setting DelayOpenTime for neighbor r4 to 240 seconds on r1")
- tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 timers delayopen 240"')
+ tgen.net["r1"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 timers delayopen 240"'
+ )
# 2.4 check config (poll output)
logger.info("Checking BGP neighbor configuration after setting DelayOpenTime on r1")
router = tgen.gears["r1"]
reffile = os.path.join(CWD, "r1/bgp_delayopen_neighbor.json")
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show bgp neighbors json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show bgp neighbors json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=3, wait=1)
assertmsg = "BGP session on r1 failed to set DelayOpenTime for r4"
assert res is None, assertmsg
# 2.5 disable peering shutdown
logger.info("Disable shutdown of peering between r1 and r4")
- tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"')
- tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"')
+ tgen.net["r1"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"'
+ )
+ tgen.net["r4"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"'
+ )
# 2.6 wait for peers to establish connection (poll output)
for router_num in [1, 4]:
- logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after disabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=5, wait=1)
- assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num)
+ assertmsg = (
+ "BGP session on r{} did not establish a connection with peer".format(
+ router_num
+ )
+ )
assert res is None, assertmsg
# 2.7 unset delayopen on R1
logger.info("Disabling DelayOpenTimer for neighbor r4 on r1")
- tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 timers delayopen"')
+ tgen.net["r1"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 timers delayopen"'
+ )
# 2.8 check config (poll output)
- logger.info("Checking BGP neighbor configuration after disabling DelayOpenTimer on r1")
- delayopen_cfg = tgen.net["r1"].cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"').rstrip()
+ logger.info(
+ "Checking BGP neighbor configuration after disabling DelayOpenTimer on r1"
+ )
+ delayopen_cfg = (
+ tgen.net["r1"]
+ .cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"')
+ .rstrip()
+ )
assertmsg = "BGP session on r1 failed disable DelayOpenTimer for peer r4"
assert delayopen_cfg == "", assertmsg
- #tgen.mininet_cli()
+ # tgen.mininet_cli()
# end test_bgp_delayopen_singular
@@ -870,67 +942,119 @@ def test_bgp_delayopen_dual():
pytest.skip(tgen.errors)
# part 3: delay 60s R2 <=> delay 30s R5
- logger.info("Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on both sides of the peering with different timer intervals")
+ logger.info(
+ "Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on both sides of the peering with different timer intervals"
+ )
# 3.1 enable peering shutdown
logger.info("Enable shutdown of peering between r2 and r5")
- tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 shutdown"')
- tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 shutdown"')
+ tgen.net["r2"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 shutdown"'
+ )
+ tgen.net["r5"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 shutdown"'
+ )
# 3.2 wait for peers to shut down (pool output)
for router_num in [2, 5]:
- logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after disabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=3, wait=1)
assertmsg = "BGP session on r{} did not shut down peer".format(router_num)
assert res is None, assertmsg
# 3.3 set delayopen on R2 to 60s and on R5 to 30s
logger.info("Setting DelayOpenTime for neighbor r5 to 60 seconds on r2")
- tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 timers delayopen 60"')
+ tgen.net["r2"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 timers delayopen 60"'
+ )
logger.info("Setting DelayOpenTime for neighbor r2 to 30 seconds on r5")
- tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 timers delayopen 30"')
+ tgen.net["r5"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 timers delayopen 30"'
+ )
# 3.4 check config (poll output)
for router_num in [2, 5]:
- logger.info("Checking BGP neighbor configuration after setting DelayOpenTime on r{}i".format(router_num))
+ logger.info(
+ "Checking BGP neighbor configuration after setting DelayOpenTime on r{}i".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_neighbor.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_neighbor.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show bgp neighbors json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show bgp neighbors json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=3, wait=1)
assertmsg = "BGP session on r{} failed to set DelayOpenTime".format(router_num)
assert res is None, assertmsg
## 3.5 disable peering shutdown
logger.info("Disable shutdown of peering between r2 and r5")
- tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 shutdown"')
- tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 shutdown"')
+ tgen.net["r2"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 shutdown"'
+ )
+ tgen.net["r5"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 shutdown"'
+ )
## 3.6 wait for peers to reach connect or active state (poll output)
delay_start = int(time.time())
for router_num in [2, 5]:
- logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after disabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_connect.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_connect.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=3, wait=1)
- assertmsg = "BGP session on r{} did not enter Connect state with peer".format(router_num)
+ assertmsg = "BGP session on r{} did not enter Connect state with peer".format(
+ router_num
+ )
assert res is None, assertmsg
## 3.7 wait for peers to establish connection (poll output)
for router_num in [2, 5]:
- logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num))
+ logger.info(
+ "Checking BGP summary after disabling shutdown of peering on r{}".format(
+ router_num
+ )
+ )
router = tgen.gears["r{}".format(router_num)]
- reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num))
+ reffile = os.path.join(
+ CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)
+ )
expected = json.loads(open(reffile).read())
- test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected)
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show ip bgp summary json", expected
+ )
_, res = topotest.run_and_expect(test_func, None, count=35, wait=1)
- assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num)
+ assertmsg = (
+ "BGP session on r{} did not establish a connection with peer".format(
+ router_num
+ )
+ )
assert res is None, assertmsg
delay_stop = int(time.time())
@@ -939,18 +1063,32 @@ def test_bgp_delayopen_dual():
# 3.8 unset delayopen on R2 and R5
logger.info("Disabling DelayOpenTimer for neighbor r5 on r2")
- tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 timers delayopen"')
+ tgen.net["r2"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 timers delayopen"'
+ )
logger.info("Disabling DelayOpenTimer for neighbor r2 on r5")
- tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 timers delayopen"')
+ tgen.net["r5"].cmd(
+ 'vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 timers delayopen"'
+ )
# 3.9 check config (poll output)
for router_num in [2, 5]:
- logger.info("Checking BGP neighbor configuration after disabling DelayOpenTimer on r{}".format(router_num))
- delayopen_cfg = tgen.net["r{}".format(router_num)].cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"').rstrip()
- assertmsg = "BGP session on r{} failed disable DelayOpenTimer".format(router_num)
+ logger.info(
+ "Checking BGP neighbor configuration after disabling DelayOpenTimer on r{}".format(
+ router_num
+ )
+ )
+ delayopen_cfg = (
+ tgen.net["r{}".format(router_num)]
+ .cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"')
+ .rstrip()
+ )
+ assertmsg = "BGP session on r{} failed disable DelayOpenTimer".format(
+ router_num
+ )
assert delayopen_cfg == "", assertmsg
- #tgen.mininet_cli()
+ # tgen.mininet_cli()
# end test_bgp_delayopen_dual
diff --git a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py
index 097b654e77..94fd17e012 100644
--- a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py
+++ b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py
@@ -316,7 +316,9 @@ def test_BGP_GR_TC_46_p1(request):
input_dict = {
"r1": {
"bgp": {
- "graceful-restart": {"graceful-restart": True,},
+ "graceful-restart": {
+ "graceful-restart": True,
+ },
"address_family": {
"ipv4": {
"unicast": {
@@ -1184,8 +1186,8 @@ def test_BGP_GR_TC_53_p1(request):
def test_BGP_GR_TC_4_p0(request):
"""
- Test Objective : Verify that the restarting node sets "R" bit while sending the
- BGP open messages after the node restart, only if GR is enabled.
+ Test Objective : Verify that the restarting node sets "R" bit while sending the
+ BGP open messages after the node restart, only if GR is enabled.
"""
tgen = get_topogen()
@@ -1368,9 +1370,9 @@ def test_BGP_GR_TC_4_p0(request):
def test_BGP_GR_TC_5_1_2_p1(request):
"""
- Test Objective : Verify if restarting node resets R bit in BGP open message
- during normal BGP session flaps as well, even when GR restarting mode is enabled.
- Here link flap happen due to interface UP/DOWN.
+ Test Objective : Verify if restarting node resets R bit in BGP open message
+ during normal BGP session flaps as well, even when GR restarting mode is enabled.
+ Here link flap happen due to interface UP/DOWN.
"""
tgen = get_topogen()
@@ -1815,8 +1817,8 @@ def test_BGP_GR_TC_6_1_2_p1(request):
def test_BGP_GR_TC_8_p1(request):
"""
- Test Objective : Verify that restarting nodes set "F" bit while sending
- the BGP open messages after it restarts, only when BGP GR is enabled.
+ Test Objective : Verify that restarting nodes set "F" bit while sending
+ the BGP open messages after it restarts, only when BGP GR is enabled.
"""
tgen = get_topogen()
@@ -1959,8 +1961,8 @@ def test_BGP_GR_TC_8_p1(request):
def test_BGP_GR_TC_17_p1(request):
"""
- Test Objective : Verify that only GR helper routers keep the stale
- route entries, not any GR disabled router.
+ Test Objective : Verify that only GR helper routers keep the stale
+ route entries, not any GR disabled router.
"""
tgen = get_topogen()
@@ -2145,8 +2147,8 @@ def test_BGP_GR_TC_17_p1(request):
def test_BGP_GR_TC_19_p1(request):
"""
- Test Objective : Verify that GR helper routers keeps all the routes received
- from restarting node if both the routers are configured as GR restarting node.
+ Test Objective : Verify that GR helper routers keeps all the routes received
+ from restarting node if both the routers are configured as GR restarting node.
"""
tgen = get_topogen()
@@ -2325,8 +2327,8 @@ def test_BGP_GR_TC_19_p1(request):
def test_BGP_GR_TC_20_p1(request):
"""
- Test Objective : Verify that GR helper routers delete all the routes
- received from a node if both the routers are configured as GR helper node.
+ Test Objective : Verify that GR helper routers delete all the routes
+ received from a node if both the routers are configured as GR helper node.
"""
tgen = get_topogen()
tc_name = request.node.name
@@ -3090,8 +3092,8 @@ def test_BGP_GR_TC_31_2_p1(request):
def test_BGP_GR_TC_9_p1(request):
"""
- Test Objective : Verify that restarting nodes reset "F" bit while sending
- the BGP open messages after it's restarts, when BGP GR is **NOT** enabled.
+ Test Objective : Verify that restarting nodes reset "F" bit while sending
+ the BGP open messages after it's restarts, when BGP GR is **NOT** enabled.
"""
tgen = get_topogen()
@@ -3264,8 +3266,8 @@ def test_BGP_GR_TC_9_p1(request):
def test_BGP_GR_TC_17_p1(request):
"""
- Test Objective : Verify that only GR helper routers keep the stale
- route entries, not any GR disabled router.
+ Test Objective : Verify that only GR helper routers keep the stale
+ route entries, not any GR disabled router.
"""
tgen = get_topogen()
@@ -3467,7 +3469,13 @@ def test_BGP_GR_TC_43_p1(request):
step("Configure R1 and R2 as GR restarting node in global level")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -3560,7 +3568,13 @@ def test_BGP_GR_TC_43_p1(request):
step("Verify on R2 that R1 doesn't advertise any GR capabilities")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart-disable": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -3659,7 +3673,13 @@ def test_BGP_GR_TC_43_p1(request):
step("Verify on R2 that R1 advertises GR capabilities as a restarting node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -3779,7 +3799,13 @@ def test_BGP_GR_TC_44_p1(request):
step("Verify on R2 that R1 advertises GR capabilities as a helper node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart-helper": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -3849,7 +3875,13 @@ def test_BGP_GR_TC_44_p1(request):
start_router_daemons(tgen, "r2", ["bgpd"])
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}}
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart-disable": True,
+ }
+ }
+ }
}
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
@@ -3857,7 +3889,13 @@ def test_BGP_GR_TC_44_p1(request):
step("Verify on R2 that R1 doesn't advertise any GR capabilities")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart-disable": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -3941,7 +3979,13 @@ def test_BGP_GR_TC_44_p1(request):
step("Verify on R2 that R1 advertises GR capabilities as a helper node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart-helper": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -4108,14 +4152,28 @@ def test_BGP_GR_TC_45_p1(request):
start_router_daemons(tgen, "r1", ["bgpd"])
- input_dict = {"r1": {"bgp": {"graceful-restart": {"graceful-restart": False,}}}}
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": False,
+ }
+ }
+ }
+ }
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
step("Verify on R2 that R1 advertises GR capabilities as a helper node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart-helper": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -4199,14 +4257,28 @@ def test_BGP_GR_TC_45_p1(request):
start_router_daemons(tgen, "r2", ["bgpd"])
- input_dict = {"r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}}
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ }
+ }
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
step("Verify on R2 that R1 advertises GR capabilities as a restarting node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -4307,7 +4379,9 @@ def test_BGP_GR_TC_46_p1(request):
input_dict = {
"r1": {
"bgp": {
- "graceful-restart": {"graceful-restart": True,},
+ "graceful-restart": {
+ "graceful-restart": True,
+ },
"address_family": {
"ipv4": {
"unicast": {
@@ -4559,7 +4633,9 @@ def test_BGP_GR_TC_47_p1(request):
input_dict = {
"r1": {
"bgp": {
- "graceful-restart": {"graceful-restart": True,},
+ "graceful-restart": {
+ "graceful-restart": True,
+ },
"address_family": {
"ipv4": {
"unicast": {
@@ -4698,7 +4774,13 @@ def test_BGP_GR_TC_47_p1(request):
step("Verify on R2 that R1 still advertises GR capabilities as a restarting node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
}
@@ -4814,7 +4896,9 @@ def test_BGP_GR_TC_48_p1(request):
input_dict = {
"r1": {
"bgp": {
- "graceful-restart": {"graceful-restart": True,},
+ "graceful-restart": {
+ "graceful-restart": True,
+ },
"address_family": {
"ipv4": {
"unicast": {
@@ -4960,7 +5044,13 @@ def test_BGP_GR_TC_48_p1(request):
step("Verify on R2 that R1 advertises GR capabilities as a restarting node")
input_dict = {
- "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}},
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ },
"r2": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}},
}
diff --git a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py
index 6926121a6b..2ddeab13f6 100644
--- a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py
+++ b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py
@@ -456,7 +456,9 @@ def test_BGP_GR_TC_3_p0(request):
input_dict = {
"r1": {
"bgp": {
- "graceful-restart": {"disable-eor": True,},
+ "graceful-restart": {
+ "disable-eor": True,
+ },
"address_family": {
"ipv4": {
"unicast": {
@@ -2095,7 +2097,10 @@ def test_BGP_GR_chaos_33_p1(request):
"ipv4": {
"unicast": {
"advertise_networks": [
- {"network": "200.0.20.1/32", "no_of_network": 2,}
+ {
+ "network": "200.0.20.1/32",
+ "no_of_network": 2,
+ }
]
}
},
@@ -2207,13 +2212,13 @@ def test_BGP_GR_chaos_33_p1(request):
else:
next_hop_6 = NEXT_HOP_6[1]
- result = verify_rib(tgen, addr_type, dut, input_dict_2, next_hop_6,
- expected=False)
- assert result is not True,\
- "Testcase {} :Failed \n Error {}". \
- format(tc_name, result)
- logger.info(" Expected behavior: {}".\
- format(result))
+ result = verify_rib(
+ tgen, addr_type, dut, input_dict_2, next_hop_6, expected=False
+ )
+ assert result is not True, "Testcase {} :Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 4] : Start BGPd daemon on R1 and R4..")
@@ -3960,7 +3965,13 @@ def test_BGP_GR_21_p2(request):
}
}
},
- "r2": {"bgp": {"graceful-restart": {"graceful-restart": True,}}},
+ "r2": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ }
+ }
+ },
}
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
diff --git a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py
index 9c0355a3e9..c2858a4bd0 100644
--- a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py
+++ b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py
@@ -2132,7 +2132,11 @@ def test_large_community_lists_with_rmap_match_regex(request):
{
"action": "permit",
"seq_id": "10",
- "match": {"large_community_list": {"id": "ALL",},},
+ "match": {
+ "large_community_list": {
+ "id": "ALL",
+ },
+ },
}
]
}
@@ -2208,7 +2212,11 @@ def test_large_community_lists_with_rmap_match_regex(request):
{
"action": "permit",
"seq_id": "20",
- "match": {"large_community_list": {"id": "EXP_ALL",},},
+ "match": {
+ "large_community_list": {
+ "id": "EXP_ALL",
+ },
+ },
}
]
}
diff --git a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py
index 9703cf8d57..d34446e2ee 100644
--- a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py
+++ b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py
@@ -95,7 +95,7 @@ from lib.common_config import (
kill_router_daemons,
start_router_daemons,
stop_router,
- start_router
+ start_router,
)
from lib.topolog import logger
@@ -129,7 +129,7 @@ LOOPBACK_2 = {
"ipv4": "20.20.20.20/32",
"ipv6": "20::20:20/128",
"ipv4_mask": "255.255.255.255",
- "ipv6_mask": None
+ "ipv6_mask": None,
}
MAX_PATHS = 2
@@ -724,16 +724,40 @@ def test_vrf_with_multiple_links_p1(request):
"local_as": "200",
"vrf": "RED_A",
"address_family": {
- "ipv4": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}},
- "ipv6": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}},
+ "ipv4": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": MAX_PATHS,
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": MAX_PATHS,
+ }
+ }
+ },
},
},
{
"local_as": "200",
"vrf": "BLUE_A",
"address_family": {
- "ipv4": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}},
- "ipv6": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}},
+ "ipv4": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": MAX_PATHS,
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": MAX_PATHS,
+ }
+ }
+ },
},
},
]
@@ -2148,7 +2172,7 @@ def test_restart_bgpd_daemon_p1(request):
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
result = verify_bgp_convergence(tgen, topo)
- assert result is True, "Testcase () :Failed\n Error {}". format(tc_name, result)
+ assert result is True, "Testcase () :Failed\n Error {}".format(tc_name, result)
step("Kill BGPd daemon on R1.")
kill_router_daemons(tgen, "r1", ["bgpd"])
diff --git a/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py b/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
index 6d7131e1e5..ceac84709b 100644
--- a/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
+++ b/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
@@ -101,7 +101,11 @@ def test_r1_receive_and_advertise_prefix_sid_type1():
"prefix": prefix,
"advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},
"paths": [
- {"valid": True, "remoteLabel": remoteLabel, "labelIndex": labelIndex,}
+ {
+ "valid": True,
+ "remoteLabel": remoteLabel,
+ "labelIndex": labelIndex,
+ }
],
}
return topotest.json_cmp(output, expected)
diff --git a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py
index 3af944473d..3f9009967d 100644
--- a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py
+++ b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py
@@ -1042,9 +1042,10 @@ def test_next_hop_with_recursive_lookup_p1(request):
next_hop=next_hop,
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "Route is still present \n Error : {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format(
+ tc_name, result
)
step("Re-apply redistribution on R4.")
@@ -1125,9 +1126,10 @@ def test_next_hop_with_recursive_lookup_p1(request):
next_hop=next_hop,
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "Route is still present \n Error : {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format(
+ tc_name, result
)
shutdown_bringup_interface(tgen, "r3", intf_r3_r4, True)
@@ -1182,9 +1184,10 @@ def test_next_hop_with_recursive_lookup_p1(request):
next_hop=next_hop,
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "Route is still present \n Error : {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format(
+ tc_name, result
)
shutdown_bringup_interface(tgen, "r4", intf_r4_r3, True)
@@ -2101,8 +2104,20 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request):
"r4": {
"bgp": {
"address_family": {
- "ipv4": {"unicast": {"maximum_paths": {"ebgp": 1,}}},
- "ipv6": {"unicast": {"maximum_paths": {"ebgp": 1,}}},
+ "ipv4": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": 1,
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": 1,
+ }
+ }
+ },
}
}
}
@@ -2131,8 +2146,20 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request):
"r4": {
"bgp": {
"address_family": {
- "ipv4": {"unicast": {"maximum_paths": {"ebgp": 2,}}},
- "ipv6": {"unicast": {"maximum_paths": {"ebgp": 2,}}},
+ "ipv4": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": 2,
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "maximum_paths": {
+ "ebgp": 2,
+ }
+ }
+ },
}
}
}
diff --git a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
index 0fabd90341..0467bf1bfb 100644
--- a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
+++ b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
@@ -415,9 +415,10 @@ def test_route_summarisation_with_summary_only_p1(request):
result = verify_rib(
tgen, addr_type, "r3", input_static, protocol="bgp", expected=False
)
- assert result is not True, (
- "Testcase : Failed \n "
- "Routes are still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase : Failed \n " "Routes are still present \n Error: {}".format(
+ tc_name, result
)
result = verify_rib(tgen, addr_type, "r1", input_static_agg, protocol="bgp")
@@ -614,7 +615,9 @@ def test_route_summarisation_with_summary_only_p1(request):
addr_type: {
"unicast": {
"advertise_networks": [
- {"network": NETWORK_4_1[addr_type],}
+ {
+ "network": NETWORK_4_1[addr_type],
+ }
]
}
}
@@ -1014,7 +1017,11 @@ def test_route_summarisation_with_as_set_p1(request):
assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result)
for addr_type in ADDR_TYPES:
- for pfx, seq_id, network, in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]):
+ for (
+ pfx,
+ seq_id,
+ network,
+ ) in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]):
prefix_list = {
"r1": {
"prefix_lists": {
diff --git a/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py b/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py
index cf8be5f44f..c75055c26f 100644
--- a/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py
+++ b/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py
@@ -56,6 +56,7 @@ class TemplateTopo(Topo):
switch.add_link(tgen.gears["r2"])
switch.add_link(tgen.gears["r3"])
+
def setup_module(mod):
tgen = Topogen(TemplateTopo, mod.__name__)
tgen.start_topology()
@@ -114,6 +115,7 @@ def test_bgp_route():
assertmsg = '"r3" JSON output mismatches'
assert result is None, assertmsg
+
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
diff --git a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py
index 9106c163cd..6e7495d929 100644
--- a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py
+++ b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py
@@ -943,9 +943,10 @@ def test_modify_route_map_match_set_clauses_p1(request):
}
result = verify_bgp_rib(tgen, addr_type, "r1", input_routes_r1, expected=False)
- assert result is not True, (
- "Testcase {} : Failed \n Error : Routes are still "
- "present {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Error : Routes are still " "present {}".format(
+ tc_name, result
)
write_test_footer(tc_name)
diff --git a/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py b/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py
index 67edcae90e..6f80ffd1aa 100755
--- a/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py
+++ b/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py
@@ -62,7 +62,7 @@ from functools import partial
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, "../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
@@ -76,8 +76,10 @@ from mininet.topo import Topo
# Global multi-dimensional dictionary containing all expected outputs
outputs = {}
+
class TemplateTopo(Topo):
"Test topology builder"
+
def build(self, *_args, **_opts):
"Build function"
tgen = get_topogen(self)
@@ -85,59 +87,58 @@ class TemplateTopo(Topo):
#
# Define FRR Routers
#
- for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']:
+ for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]:
tgen.add_router(router)
#
# Define connections
#
- switch = tgen.add_switch('s1')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-rt2")
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt1")
- switch = tgen.add_switch('s2')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt3")
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt2")
- switch = tgen.add_switch('s3')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-rt3")
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt1")
- switch = tgen.add_switch('s4')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-rt4")
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt1")
- switch = tgen.add_switch('s5')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-rt5")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt1")
- switch = tgen.add_switch('s6')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt1")
- switch = tgen.add_switch('s7')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt7")
- switch.add_link(tgen.gears['rt7'], nodeif="eth-rt2")
- switch = tgen.add_switch('s8')
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt7")
- switch.add_link(tgen.gears['rt7'], nodeif="eth-rt3")
- switch = tgen.add_switch('s9')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt7")
- switch.add_link(tgen.gears['rt7'], nodeif="eth-rt4")
- switch = tgen.add_switch('s10')
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt7")
- switch.add_link(tgen.gears['rt7'], nodeif="eth-rt5")
- switch = tgen.add_switch('s11')
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt7")
- switch.add_link(tgen.gears['rt7'], nodeif="eth-rt6")
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-rt2")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt1")
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt3")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt2")
+ switch = tgen.add_switch("s3")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-rt3")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt1")
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-rt4")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt1")
+ switch = tgen.add_switch("s5")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-rt5")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt1")
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt1")
+ switch = tgen.add_switch("s7")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt7")
+ switch.add_link(tgen.gears["rt7"], nodeif="eth-rt2")
+ switch = tgen.add_switch("s8")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt7")
+ switch.add_link(tgen.gears["rt7"], nodeif="eth-rt3")
+ switch = tgen.add_switch("s9")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt7")
+ switch.add_link(tgen.gears["rt7"], nodeif="eth-rt4")
+ switch = tgen.add_switch("s10")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt7")
+ switch.add_link(tgen.gears["rt7"], nodeif="eth-rt5")
+ switch = tgen.add_switch("s11")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt7")
+ switch.add_link(tgen.gears["rt7"], nodeif="eth-rt6")
#
# Populate multi-dimensional dictionary containing all expected outputs
#
- files = ["show_ipv6_route.ref",
- "show_yang_interface_isis_adjacencies.ref"]
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']:
+ files = ["show_ipv6_route.ref", "show_yang_interface_isis_adjacencies.ref"]
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]:
outputs[rname] = {}
for step in range(1, 13 + 1):
outputs[rname][step] = {}
for file in files:
if step == 1:
# Get snapshots relative to the expected initial network convergence
- filename = '{}/{}/step{}/{}'.format(CWD, rname, step, file)
+ filename = "{}/{}/step{}/{}".format(CWD, rname, step, file)
outputs[rname][step][file] = open(filename).read()
else:
if rname != "rt1":
@@ -146,20 +147,23 @@ class TemplateTopo(Topo):
continue
# Get diff relative to the previous step
- filename = '{}/{}/step{}/{}.diff'.format(CWD, rname, step, file)
+ filename = "{}/{}/step{}/{}.diff".format(CWD, rname, step, file)
# Create temporary files in order to apply the diff
f_in = tempfile.NamedTemporaryFile()
f_in.write(outputs[rname][step - 1][file])
f_in.flush()
f_out = tempfile.NamedTemporaryFile()
- os.system("patch -s -o %s %s %s" %(f_out.name, f_in.name, filename))
+ os.system(
+ "patch -s -o %s %s %s" % (f_out.name, f_in.name, filename)
+ )
# Store the updated snapshot and remove the temporary files
outputs[rname][step][file] = open(f_out.name).read()
f_in.close()
f_out.close()
+
def setup_module(mod):
"Sets up the pytest environment"
tgen = Topogen(TemplateTopo, mod.__name__)
@@ -170,16 +174,15 @@ def setup_module(mod):
# For all registered routers, load the zebra configuration file
for rname, router in router_list.iteritems():
router.load_config(
- TopoRouter.RD_ZEBRA,
- os.path.join(CWD, '{}/zebra.conf'.format(rname))
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_ISIS,
- os.path.join(CWD, '{}/isisd.conf'.format(rname))
+ TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
)
tgen.start_router()
+
def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
@@ -187,6 +190,7 @@ def teardown_module(mod):
# This function tears down the whole topology.
tgen.stop_topology()
+
def router_compare_json_output(rname, command, reference):
"Compare router JSON output"
@@ -196,12 +200,12 @@ def router_compare_json_output(rname, command, reference):
expected = json.loads(reference)
# Run test function until we get an result. Wait at most 60 seconds.
- test_func = partial(topotest.router_json_cmp,
- tgen.gears[rname], command, expected)
+ test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected)
_, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5)
assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
assert diff is None, assertmsg
+
#
# Step 1
#
@@ -215,9 +219,13 @@ def test_isis_adjacencies_step1():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']:
- router_compare_json_output(rname, "show yang operational-data /frr-interface:lib isisd",
- outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]:
+ router_compare_json_output(
+ rname,
+ "show yang operational-data /frr-interface:lib isisd",
+ outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"],
+ )
+
def test_rib_ipv6_step1():
logger.info("Test (step 1): verify IPv6 RIB")
@@ -227,9 +235,11 @@ def test_rib_ipv6_step1():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][1]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][1]["show_ipv6_route.ref"]
+ )
+
#
# Step 2
@@ -248,16 +258,28 @@ def test_rib_ipv6_step2():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Disabling LFA protection on all rt1 interfaces')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt3" -c "no isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt4" -c "no isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt5" -c "no isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt6" -c "no isis fast-reroute lfa"')
+ logger.info("Disabling LFA protection on all rt1 interfaces")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt3" -c "no isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt4" -c "no isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt5" -c "no isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt6" -c "no isis fast-reroute lfa"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][2]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][2]["show_ipv6_route.ref"])
#
# Step 3
@@ -276,16 +298,28 @@ def test_rib_ipv6_step3():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Re-enabling LFA protection on all rt1 interfaces')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt3" -c "isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt4" -c "isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt5" -c "isis fast-reroute lfa"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt6" -c "isis fast-reroute lfa"')
+ logger.info("Re-enabling LFA protection on all rt1 interfaces")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt3" -c "isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt4" -c "isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt5" -c "isis fast-reroute lfa"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt6" -c "isis fast-reroute lfa"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][3]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][3]["show_ipv6_route.ref"])
#
# Step 4
@@ -304,12 +338,16 @@ def test_rib_ipv6_step4():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Disabling LFA load-sharing on rt1')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute load-sharing disable"')
+ logger.info("Disabling LFA load-sharing on rt1")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute load-sharing disable"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][4]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][4]["show_ipv6_route.ref"])
#
# Step 5
@@ -328,12 +366,16 @@ def test_rib_ipv6_step5():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Re-enabling LFA load-sharing on rt1')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute load-sharing disable"')
+ logger.info("Re-enabling LFA load-sharing on rt1")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute load-sharing disable"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][5]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][5]["show_ipv6_route.ref"])
#
# Step 6
@@ -352,12 +394,16 @@ def test_rib_ipv6_step6():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Limiting backup computation to critical priority prefixes only')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute priority-limit critical"')
+ logger.info("Limiting backup computation to critical priority prefixes only")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute priority-limit critical"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][6]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][6]["show_ipv6_route.ref"])
#
# Step 7
@@ -377,13 +423,19 @@ def test_rib_ipv6_step7():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Configuring a prefix priority list')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "spf prefix-priority critical CRITICAL_DESTINATIONS"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"')
+ logger.info("Configuring a prefix priority list")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "spf prefix-priority critical CRITICAL_DESTINATIONS"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][7]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][7]["show_ipv6_route.ref"])
#
# Step 8
@@ -402,14 +454,22 @@ def test_rib_ipv6_step8():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Reverting previous changes related to prefix priorities')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "no ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute priority-limit critical"')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no spf prefix-priority critical CRITICAL_DESTINATIONS"')
+ logger.info("Reverting previous changes related to prefix priorities")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "no ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute priority-limit critical"'
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no spf prefix-priority critical CRITICAL_DESTINATIONS"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][8]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][8]["show_ipv6_route.ref"])
#
# Step 9
@@ -428,12 +488,16 @@ def test_rib_ipv6_step9():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Excluding eth-rt6 from LFA computation for eth-rt2\'s failure')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa exclude interface eth-rt6"')
+ logger.info("Excluding eth-rt6 from LFA computation for eth-rt2's failure")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa exclude interface eth-rt6"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][9]["show_ipv6_route.ref"]
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][9]["show_ipv6_route.ref"])
#
# Step 10
@@ -452,12 +516,20 @@ def test_rib_ipv6_step10():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Removing exclusion of eth-rt6 from LFA computation for eth-rt2\'s failure')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa exclude interface eth-rt6"')
+ logger.info(
+ "Removing exclusion of eth-rt6 from LFA computation for eth-rt2's failure"
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa exclude interface eth-rt6"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname,
+ "show ipv6 route isis json",
+ outputs[rname][10]["show_ipv6_route.ref"],
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][10]["show_ipv6_route.ref"])
#
# Step 11
@@ -476,12 +548,18 @@ def test_rib_ipv6_step11():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Adding LFA tiebreaker: prefer node protecting backup path')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker node-protecting index 10"')
+ logger.info("Adding LFA tiebreaker: prefer node protecting backup path")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker node-protecting index 10"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname,
+ "show ipv6 route isis json",
+ outputs[rname][11]["show_ipv6_route.ref"],
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][11]["show_ipv6_route.ref"])
#
# Step 12
@@ -500,12 +578,18 @@ def test_rib_ipv6_step12():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Adding LFA tiebreaker: prefer backup path via downstream node')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker downstream index 20"')
+ logger.info("Adding LFA tiebreaker: prefer backup path via downstream node")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker downstream index 20"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname,
+ "show ipv6 route isis json",
+ outputs[rname][12]["show_ipv6_route.ref"],
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][12]["show_ipv6_route.ref"])
#
# Step 13
@@ -524,22 +608,29 @@ def test_rib_ipv6_step13():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Adding LFA tiebreaker: prefer backup path with lowest total metric')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker lowest-backup-metric index 30"')
+ logger.info("Adding LFA tiebreaker: prefer backup path with lowest total metric")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker lowest-backup-metric index 30"'
+ )
+
+ for rname in ["rt1"]:
+ router_compare_json_output(
+ rname,
+ "show ipv6 route isis json",
+ outputs[rname][13]["show_ipv6_route.ref"],
+ )
- for rname in ['rt1']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][13]["show_ipv6_route.ref"])
# Memory leak test template
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()
if not tgen.is_memleak_enabled():
- pytest.skip('Memory leak test/report is disabled')
+ pytest.skip("Memory leak test/report is disabled")
tgen.report_memory_leaks()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
diff --git a/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py b/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py
index 514ea53552..a1263de8ad 100755
--- a/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py
+++ b/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py
@@ -74,7 +74,7 @@ from functools import partial
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, "../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
@@ -88,8 +88,10 @@ from mininet.topo import Topo
# Global multi-dimensional dictionary containing all expected outputs
outputs = {}
+
class TemplateTopo(Topo):
"Test topology builder"
+
def build(self, *_args, **_opts):
"Build function"
tgen = get_topogen(self)
@@ -97,80 +99,85 @@ class TemplateTopo(Topo):
#
# Define FRR Routers
#
- for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
+ for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
tgen.add_router(router)
#
# Define connections
#
- switch = tgen.add_switch('s1')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-sw1")
- switch.add_link(tgen.gears['rt2'], nodeif="eth-sw1")
- switch.add_link(tgen.gears['rt3'], nodeif="eth-sw1")
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1")
- switch = tgen.add_switch('s2')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-1")
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-1")
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-1")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-1")
- switch = tgen.add_switch('s3')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-2")
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-2")
+ switch = tgen.add_switch("s3")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-2")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-2")
- switch = tgen.add_switch('s4')
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-1")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-1")
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-1")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-1")
- switch = tgen.add_switch('s5')
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-2")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-2")
+ switch = tgen.add_switch("s5")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-2")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-2")
- switch = tgen.add_switch('s6')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt5")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt4")
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4")
- switch = tgen.add_switch('s7')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt4")
+ switch = tgen.add_switch("s7")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4")
- switch = tgen.add_switch('s8')
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt5")
+ switch = tgen.add_switch("s8")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5")
#
# Populate multi-dimensional dictionary containing all expected outputs
#
- files = ["show_ip_route.ref",
- "show_ipv6_route.ref",
- "show_mpls_table.ref",
- "show_yang_interface_isis_adjacencies.ref"]
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
+ files = [
+ "show_ip_route.ref",
+ "show_ipv6_route.ref",
+ "show_mpls_table.ref",
+ "show_yang_interface_isis_adjacencies.ref",
+ ]
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
outputs[rname] = {}
for step in range(1, 9 + 1):
outputs[rname][step] = {}
for file in files:
if step == 1:
# Get snapshots relative to the expected initial network convergence
- filename = '{}/{}/step{}/{}'.format(CWD, rname, step, file)
+ filename = "{}/{}/step{}/{}".format(CWD, rname, step, file)
outputs[rname][step][file] = open(filename).read()
else:
if file == "show_yang_interface_isis_adjacencies.ref":
continue
# Get diff relative to the previous step
- filename = '{}/{}/step{}/{}.diff'.format(CWD, rname, step, file)
+ filename = "{}/{}/step{}/{}.diff".format(CWD, rname, step, file)
# Create temporary files in order to apply the diff
f_in = tempfile.NamedTemporaryFile()
f_in.write(outputs[rname][step - 1][file])
f_in.flush()
f_out = tempfile.NamedTemporaryFile()
- os.system("patch -s -o %s %s %s" %(f_out.name, f_in.name, filename))
+ os.system(
+ "patch -s -o %s %s %s" % (f_out.name, f_in.name, filename)
+ )
# Store the updated snapshot and remove the temporary files
outputs[rname][step][file] = open(f_out.name).read()
f_in.close()
f_out.close()
+
def setup_module(mod):
"Sets up the pytest environment"
tgen = Topogen(TemplateTopo, mod.__name__)
@@ -181,16 +188,15 @@ def setup_module(mod):
# For all registered routers, load the zebra configuration file
for rname, router in router_list.items():
router.load_config(
- TopoRouter.RD_ZEBRA,
- os.path.join(CWD, '{}/zebra.conf'.format(rname))
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_ISIS,
- os.path.join(CWD, '{}/isisd.conf'.format(rname))
+ TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
)
tgen.start_router()
+
def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
@@ -198,6 +204,7 @@ def teardown_module(mod):
# This function tears down the whole topology.
tgen.stop_topology()
+
def router_compare_json_output(rname, command, reference):
"Compare router JSON output"
@@ -207,12 +214,12 @@ def router_compare_json_output(rname, command, reference):
expected = json.loads(reference)
# Run test function until we get an result. Wait at most 60 seconds.
- test_func = partial(topotest.router_json_cmp,
- tgen.gears[rname], command, expected)
+ test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected)
_, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5)
assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
assert diff is None, assertmsg
+
#
# Step 1
#
@@ -226,9 +233,13 @@ def test_isis_adjacencies_step1():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show yang operational-data /frr-interface:lib isisd",
- outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname,
+ "show yang operational-data /frr-interface:lib isisd",
+ outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"],
+ )
+
def test_rib_ipv4_step1():
logger.info("Test (step 1): verify IPv4 RIB")
@@ -238,9 +249,11 @@ def test_rib_ipv4_step1():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][1]["show_ip_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][1]["show_ip_route.ref"]
+ )
+
def test_rib_ipv6_step1():
logger.info("Test (step 1): verify IPv6 RIB")
@@ -250,9 +263,11 @@ def test_rib_ipv6_step1():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][1]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][1]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step1():
logger.info("Test (step 1): verify MPLS LIB")
@@ -262,9 +277,11 @@ def test_mpls_lib_step1():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][1]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][1]["show_mpls_table.ref"]
+ )
+
#
# Step 2
@@ -283,12 +300,16 @@ def test_rib_ipv4_step2():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Disabling TI-LFA link protection on rt2\'s eth-sw1 interface')
- tgen.net['rt2'].cmd('vtysh -c "conf t" -c "interface eth-sw1" -c "no isis fast-reroute ti-lfa"')
+ logger.info("Disabling TI-LFA link protection on rt2's eth-sw1 interface")
+ tgen.net["rt2"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-sw1" -c "no isis fast-reroute ti-lfa"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][2]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][2]["show_ip_route.ref"])
def test_rib_ipv6_step2():
logger.info("Test (step 2): verify IPv6 RIB")
@@ -298,9 +319,11 @@ def test_rib_ipv6_step2():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][2]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][2]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step2():
logger.info("Test (step 2): verify MPLS LIB")
@@ -310,9 +333,11 @@ def test_mpls_lib_step2():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][2]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][2]["show_mpls_table.ref"]
+ )
+
#
# Step 3
@@ -331,12 +356,16 @@ def test_rib_ipv4_step3():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Enabling TI-LFA link protection on rt2\'s eth-sw1 interface')
- tgen.net['rt2'].cmd('vtysh -c "conf t" -c "interface eth-sw1" -c "isis fast-reroute ti-lfa"')
+ logger.info("Enabling TI-LFA link protection on rt2's eth-sw1 interface")
+ tgen.net["rt2"].cmd(
+ 'vtysh -c "conf t" -c "interface eth-sw1" -c "isis fast-reroute ti-lfa"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][3]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][3]["show_ip_route.ref"])
def test_rib_ipv6_step3():
logger.info("Test (step 3): verify IPv6 RIB")
@@ -346,9 +375,11 @@ def test_rib_ipv6_step3():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][3]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][3]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step3():
logger.info("Test (step 3): verify MPLS LIB")
@@ -358,9 +389,11 @@ def test_mpls_lib_step3():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][3]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][3]["show_mpls_table.ref"]
+ )
+
#
# Step 4
@@ -384,12 +417,16 @@ def test_rib_ipv4_step4():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Disabling SR on rt4')
- tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing on"')
+ logger.info("Disabling SR on rt4")
+ tgen.net["rt4"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing on"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][4]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][4]["show_ip_route.ref"])
def test_rib_ipv6_step4():
logger.info("Test (step 4): verify IPv6 RIB")
@@ -399,9 +436,11 @@ def test_rib_ipv6_step4():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][4]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][4]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step4():
logger.info("Test (step 4): verify MPLS LIB")
@@ -411,9 +450,11 @@ def test_mpls_lib_step4():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][4]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][4]["show_mpls_table.ref"]
+ )
+
#
# Step 5
@@ -432,12 +473,14 @@ def test_rib_ipv4_step5():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Enabling SR on rt4')
- tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing on"')
+ logger.info("Enabling SR on rt4")
+ tgen.net["rt4"].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing on"')
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][5]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][5]["show_ip_route.ref"])
def test_rib_ipv6_step5():
logger.info("Test (step 5): verify IPv6 RIB")
@@ -447,9 +490,11 @@ def test_rib_ipv6_step5():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][5]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][5]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step5():
logger.info("Test (step 5): verify MPLS LIB")
@@ -459,9 +504,11 @@ def test_mpls_lib_step5():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][5]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][5]["show_mpls_table.ref"]
+ )
+
#
# Step 6
@@ -480,12 +527,16 @@ def test_rib_ipv4_step6():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Changing rt5\'s SRGB')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing global-block 30000 37999"')
+ logger.info("Changing rt5's SRGB")
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing global-block 30000 37999"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][6]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][6]["show_ip_route.ref"])
def test_rib_ipv6_step6():
logger.info("Test (step 6): verify IPv6 RIB")
@@ -495,9 +546,11 @@ def test_rib_ipv6_step6():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][6]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][6]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step6():
logger.info("Test (step 6): verify MPLS LIB")
@@ -507,9 +560,11 @@ def test_mpls_lib_step6():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][6]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][6]["show_mpls_table.ref"]
+ )
+
#
# Step 7
@@ -529,13 +584,19 @@ def test_rib_ipv4_step7():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Deleting rt5\'s Prefix-SIDs')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 5.5.5.5/32 index 50"')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 2001:db8:1000::5/128 index 51"')
+ logger.info("Deleting rt5's Prefix-SIDs")
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 5.5.5.5/32 index 50"'
+ )
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 2001:db8:1000::5/128 index 51"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][7]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][7]["show_ip_route.ref"])
def test_rib_ipv6_step7():
logger.info("Test (step 7): verify IPv6 RIB")
@@ -545,9 +606,11 @@ def test_rib_ipv6_step7():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][7]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][7]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step7():
logger.info("Test (step 7): verify MPLS LIB")
@@ -557,9 +620,11 @@ def test_mpls_lib_step7():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][7]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][7]["show_mpls_table.ref"]
+ )
+
#
# Step 8
@@ -578,13 +643,19 @@ def test_rib_ipv4_step8():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Re-adding rt5\'s Prefix-SIDs')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 50"')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 51"')
+ logger.info("Re-adding rt5's Prefix-SIDs")
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 50"'
+ )
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 51"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][8]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][8]["show_ip_route.ref"])
def test_rib_ipv6_step8():
logger.info("Test (step 8): verify IPv6 RIB")
@@ -594,9 +665,11 @@ def test_rib_ipv6_step8():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][8]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][8]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step8():
logger.info("Test (step 8): verify MPLS LIB")
@@ -606,9 +679,11 @@ def test_mpls_lib_step8():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][8]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][8]["show_mpls_table.ref"]
+ )
+
#
# Step 9
@@ -628,13 +703,19 @@ def test_rib_ipv4_step9():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Re-adding rt5\'s Prefix-SIDs')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 500"')
- tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 501"')
+ logger.info("Re-adding rt5's Prefix-SIDs")
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 500"'
+ )
+ tgen.net["rt5"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 501"'
+ )
+
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ip route isis json", outputs[rname][9]["show_ip_route.ref"]
+ )
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ip route isis json",
- outputs[rname][9]["show_ip_route.ref"])
def test_rib_ipv6_step9():
logger.info("Test (step 9): verify IPv6 RIB")
@@ -644,9 +725,11 @@ def test_rib_ipv6_step9():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show ipv6 route isis json",
- outputs[rname][9]["show_ipv6_route.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show ipv6 route isis json", outputs[rname][9]["show_ipv6_route.ref"]
+ )
+
def test_mpls_lib_step9():
logger.info("Test (step 9): verify MPLS LIB")
@@ -656,19 +739,22 @@ def test_mpls_lib_step9():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
- router_compare_json_output(rname, "show mpls table json",
- outputs[rname][9]["show_mpls_table.ref"])
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
+ router_compare_json_output(
+ rname, "show mpls table json", outputs[rname][9]["show_mpls_table.ref"]
+ )
+
# Memory leak test template
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()
if not tgen.is_memleak_enabled():
- pytest.skip('Memory leak test/report is disabled')
+ pytest.skip("Memory leak test/report is disabled")
tgen.report_memory_leaks()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
diff --git a/tests/topotests/ldp-topo1/test_ldp_topo1.py b/tests/topotests/ldp-topo1/test_ldp_topo1.py
index 31adeafbf6..9822686dfc 100644
--- a/tests/topotests/ldp-topo1/test_ldp_topo1.py
+++ b/tests/topotests/ldp-topo1/test_ldp_topo1.py
@@ -647,9 +647,11 @@ def test_zebra_ipv4_routingTable():
else:
print("r%s ok" % i)
- assert failures == 0, (
- "IPv4 Zebra Routing Table verification failed for router r%s:\n%s"
- % (i, diff)
+ assert (
+ failures == 0
+ ), "IPv4 Zebra Routing Table verification failed for router r%s:\n%s" % (
+ i,
+ diff,
)
# Make sure that all daemons are running
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index d81f740548..70c4c061f8 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -1151,7 +1151,7 @@ def generate_ips(network, no_of_ips):
mask = int(start_ipaddr.split("/")[1])
else:
logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
- assert(0)
+ assert 0
addr_type = validate_ip_address(start_ip)
if addr_type == "ipv4":
@@ -2612,7 +2612,7 @@ def verify_rib(
tag=None,
metric=None,
fib=None,
- count_only=False
+ count_only=False,
):
"""
Data will be read from input_dict or input JSON file, API will generate
@@ -2804,8 +2804,10 @@ def verify_rib(
"Nexthops are missing for "
"route {} in RIB of router {}: "
"expected {}, found {}\n".format(
- st_rt, dut, len(next_hop),
- len(found_hops)
+ st_rt,
+ dut,
+ len(next_hop),
+ len(found_hops),
)
)
return errormsg
@@ -2852,7 +2854,11 @@ def verify_rib(
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
- " route {} in RIB \n".format(dut, _tag, st_rt,)
+ " route {} in RIB \n".format(
+ dut,
+ _tag,
+ st_rt,
+ )
)
return errormsg
@@ -2869,7 +2875,11 @@ def verify_rib(
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
- "route {} in RIB \n".format(dut, metric, st_rt,)
+ "route {} in RIB \n".format(
+ dut,
+ metric,
+ st_rt,
+ )
)
return errormsg
@@ -2918,7 +2928,9 @@ def verify_rib(
for advertise_network_dict in advertise_network:
if "vrf" in advertise_network_dict:
- cmd = "{} vrf {} json".format(command, advertise_network_dict["vrf"])
+ cmd = "{} vrf {} json".format(
+ command, advertise_network_dict["vrf"]
+ )
else:
cmd = "{} json".format(command)
@@ -2997,6 +3009,7 @@ def verify_rib(
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+
@retry(attempts=6, wait=2, return_is_str=True)
def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
"""
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 9f3d4841b0..5c7514a641 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -874,7 +874,11 @@ def verify_ospf_rib(
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
- " route {} in RIB \n".format(dut, _tag, st_rt,)
+ " route {} in RIB \n".format(
+ dut,
+ _tag,
+ st_rt,
+ )
)
return errormsg
@@ -891,7 +895,11 @@ def verify_ospf_rib(
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
- "route {} in RIB \n".format(dut, metric, st_rt,)
+ "route {} in RIB \n".format(
+ dut,
+ metric,
+ st_rt,
+ )
)
return errormsg
diff --git a/tests/topotests/lib/test/test_json.py b/tests/topotests/lib/test/test_json.py
index b85e193d3b..7b3c8593cc 100755
--- a/tests/topotests/lib/test/test_json.py
+++ b/tests/topotests/lib/test/test_json.py
@@ -107,16 +107,25 @@ def test_json_intersect_multilevel_true():
dcomplete = {
"i1": "item1",
"i2": "item2",
- "i3": {"i100": "item100",},
+ "i3": {
+ "i100": "item100",
+ },
"i4": {
- "i41": {"i411": "item411",},
- "i42": {"i421": "item421", "i422": "item422",},
+ "i41": {
+ "i411": "item411",
+ },
+ "i42": {
+ "i421": "item421",
+ "i422": "item422",
+ },
},
}
dsub1 = {
"i1": "item1",
- "i3": {"i100": "item100",},
+ "i3": {
+ "i100": "item100",
+ },
"i10": None,
}
dsub2 = {
@@ -126,10 +135,36 @@ def test_json_intersect_multilevel_true():
}
dsub3 = {
"i2": "item2",
- "i4": {"i41": {"i411": "item411",}, "i42": {"i422": "item422", "i450": None,}},
+ "i4": {
+ "i41": {
+ "i411": "item411",
+ },
+ "i42": {
+ "i422": "item422",
+ "i450": None,
+ },
+ },
+ }
+ dsub4 = {
+ "i2": "item2",
+ "i4": {
+ "i41": {},
+ "i42": {
+ "i450": None,
+ },
+ },
+ }
+ dsub5 = {
+ "i2": "item2",
+ "i3": {
+ "i100": "item100",
+ },
+ "i4": {
+ "i42": {
+ "i450": None,
+ }
+ },
}
- dsub4 = {"i2": "item2", "i4": {"i41": {}, "i42": {"i450": None,}}}
- dsub5 = {"i2": "item2", "i3": {"i100": "item100",}, "i4": {"i42": {"i450": None,}}}
assert json_cmp(dcomplete, dsub1) is None
assert json_cmp(dcomplete, dsub2) is None
@@ -144,17 +179,26 @@ def test_json_intersect_multilevel_false():
dcomplete = {
"i1": "item1",
"i2": "item2",
- "i3": {"i100": "item100",},
+ "i3": {
+ "i100": "item100",
+ },
"i4": {
- "i41": {"i411": "item411",},
- "i42": {"i421": "item421", "i422": "item422",},
+ "i41": {
+ "i411": "item411",
+ },
+ "i42": {
+ "i421": "item421",
+ "i422": "item422",
+ },
},
}
# Incorrect sub-level value
dsub1 = {
"i1": "item1",
- "i3": {"i100": "item00",},
+ "i3": {
+ "i100": "item00",
+ },
"i10": None,
}
# Inexistent sub-level
@@ -166,14 +210,41 @@ def test_json_intersect_multilevel_false():
# Inexistent sub-level value
dsub3 = {
"i1": "item1",
- "i3": {"i100": None,},
+ "i3": {
+ "i100": None,
+ },
}
# Inexistent sub-sub-level value
- dsub4 = {"i4": {"i41": {"i412": "item412",}, "i42": {"i421": "item421",}}}
+ dsub4 = {
+ "i4": {
+ "i41": {
+ "i412": "item412",
+ },
+ "i42": {
+ "i421": "item421",
+ },
+ }
+ }
# Invalid sub-sub-level value
- dsub5 = {"i4": {"i41": {"i411": "item411",}, "i42": {"i421": "item420000",}}}
+ dsub5 = {
+ "i4": {
+ "i41": {
+ "i411": "item411",
+ },
+ "i42": {
+ "i421": "item420000",
+ },
+ }
+ }
# sub-sub-level should be value
- dsub6 = {"i4": {"i41": {"i411": "item411",}, "i42": "foobar",}}
+ dsub6 = {
+ "i4": {
+ "i41": {
+ "i411": "item411",
+ },
+ "i42": "foobar",
+ }
+ }
assert json_cmp(dcomplete, dsub1) is not None
assert json_cmp(dcomplete, dsub2) is not None
@@ -187,7 +258,15 @@ def test_json_with_list_sucess():
"Test successful json comparisons that have lists."
dcomplete = {
- "list": [{"i1": "item 1", "i2": "item 2",}, {"i10": "item 10",},],
+ "list": [
+ {
+ "i1": "item 1",
+ "i2": "item 2",
+ },
+ {
+ "i10": "item 10",
+ },
+ ],
"i100": "item 100",
}
@@ -197,12 +276,19 @@ def test_json_with_list_sucess():
}
# Test list correct list items
dsub2 = {
- "list": [{"i1": "item 1",},],
+ "list": [
+ {
+ "i1": "item 1",
+ },
+ ],
"i100": "item 100",
}
# Test list correct list size
dsub3 = {
- "list": [{}, {},],
+ "list": [
+ {},
+ {},
+ ],
}
assert json_cmp(dcomplete, dsub1) is None
@@ -214,7 +300,15 @@ def test_json_with_list_failure():
"Test failed json comparisons that have lists."
dcomplete = {
- "list": [{"i1": "item 1", "i2": "item 2",}, {"i10": "item 10",},],
+ "list": [
+ {
+ "i1": "item 1",
+ "i2": "item 2",
+ },
+ {
+ "i10": "item 10",
+ },
+ ],
"i100": "item 100",
}
@@ -224,12 +318,20 @@ def test_json_with_list_failure():
}
# Test list incorrect list items
dsub2 = {
- "list": [{"i1": "item 2",},],
+ "list": [
+ {
+ "i1": "item 2",
+ },
+ ],
"i100": "item 100",
}
# Test list correct list size
dsub3 = {
- "list": [{}, {}, {},],
+ "list": [
+ {},
+ {},
+ {},
+ ],
}
assert json_cmp(dcomplete, dsub1) is not None
@@ -241,20 +343,52 @@ def test_json_list_start_success():
"Test JSON encoded data that starts with a list that should succeed."
dcomplete = [
- {"id": 100, "value": "abc",},
- {"id": 200, "value": "abcd",},
- {"id": 300, "value": "abcde",},
+ {
+ "id": 100,
+ "value": "abc",
+ },
+ {
+ "id": 200,
+ "value": "abcd",
+ },
+ {
+ "id": 300,
+ "value": "abcde",
+ },
]
- dsub1 = [{"id": 100, "value": "abc",}]
+ dsub1 = [
+ {
+ "id": 100,
+ "value": "abc",
+ }
+ ]
- dsub2 = [{"id": 100, "value": "abc",}, {"id": 200, "value": "abcd",}]
+ dsub2 = [
+ {
+ "id": 100,
+ "value": "abc",
+ },
+ {
+ "id": 200,
+ "value": "abcd",
+ },
+ ]
- dsub3 = [{"id": 300, "value": "abcde",}]
+ dsub3 = [
+ {
+ "id": 300,
+ "value": "abcde",
+ }
+ ]
dsub4 = []
- dsub5 = [{"id": 100,}]
+ dsub5 = [
+ {
+ "id": 100,
+ }
+ ]
assert json_cmp(dcomplete, dsub1) is None
assert json_cmp(dcomplete, dsub2) is None
@@ -272,13 +406,44 @@ def test_json_list_start_failure():
{"id": 300, "value": "abcde"},
]
- dsub1 = [{"id": 100, "value": "abcd",}]
+ dsub1 = [
+ {
+ "id": 100,
+ "value": "abcd",
+ }
+ ]
- dsub2 = [{"id": 100, "value": "abc",}, {"id": 200, "value": "abc",}]
+ dsub2 = [
+ {
+ "id": 100,
+ "value": "abc",
+ },
+ {
+ "id": 200,
+ "value": "abc",
+ },
+ ]
- dsub3 = [{"id": 100, "value": "abc",}, {"id": 350, "value": "abcde",}]
+ dsub3 = [
+ {
+ "id": 100,
+ "value": "abc",
+ },
+ {
+ "id": 350,
+ "value": "abcde",
+ },
+ ]
- dsub4 = [{"value": "abcx",}, {"id": 300, "value": "abcde",}]
+ dsub4 = [
+ {
+ "value": "abcx",
+ },
+ {
+ "id": 300,
+ "value": "abcde",
+ },
+ ]
assert json_cmp(dcomplete, dsub1) is not None
assert json_cmp(dcomplete, dsub2) is not None
diff --git a/tests/topotests/lib/topogen.py b/tests/topotests/lib/topogen.py
index 86f06b2af7..7c52e824c1 100644
--- a/tests/topotests/lib/topogen.py
+++ b/tests/topotests/lib/topogen.py
@@ -336,7 +336,9 @@ class Topogen(object):
for gear in self.gears.values():
errors += gear.stop()
if len(errors) > 0:
- logger.error("Errors found post shutdown - details follow: {}".format(errors))
+ logger.error(
+ "Errors found post shutdown - details follow: {}".format(errors)
+ )
self.net.stop()
diff --git a/tests/topotests/lib/topotest.py b/tests/topotests/lib/topotest.py
index d1e76866a7..ef0ac27118 100644
--- a/tests/topotests/lib/topotest.py
+++ b/tests/topotests/lib/topotest.py
@@ -609,8 +609,10 @@ def interface_set_status(node, ifacename, ifaceaction=False, vrf_name=None):
ifacename, str_ifaceaction
)
else:
- cmd = 'vtysh -c "configure terminal" -c "interface {0} vrf {1}" -c "{2}"'.format(
- ifacename, vrf_name, str_ifaceaction
+ cmd = (
+ 'vtysh -c "configure terminal" -c "interface {0} vrf {1}" -c "{2}"'.format(
+ ifacename, vrf_name, str_ifaceaction
+ )
)
node.run(cmd)
@@ -924,40 +926,44 @@ def checkAddressSanitizerError(output, router, component, logdir=""):
)
if addressSanitizerLog:
# Find Calling Test. Could be multiple steps back
- testframe=sys._current_frames().values()[0]
- level=0
+ testframe = sys._current_frames().values()[0]
+ level = 0
while level < 10:
- test=os.path.splitext(os.path.basename(testframe.f_globals["__file__"]))[0]
+ test = os.path.splitext(
+ os.path.basename(testframe.f_globals["__file__"])
+ )[0]
if (test != "topotest") and (test != "topogen"):
# Found the calling test
- callingTest=os.path.basename(testframe.f_globals["__file__"])
+ callingTest = os.path.basename(testframe.f_globals["__file__"])
break
- level=level+1
- testframe=testframe.f_back
- if (level >= 10):
+ level = level + 1
+ testframe = testframe.f_back
+ if level >= 10:
# somehow couldn't find the test script.
- callingTest="unknownTest"
+ callingTest = "unknownTest"
#
# Now finding Calling Procedure
- level=0
+ level = 0
while level < 20:
- callingProc=sys._getframe(level).f_code.co_name
- if ((callingProc != "processAddressSanitizerError") and
- (callingProc != "checkAddressSanitizerError") and
- (callingProc != "checkRouterCores") and
- (callingProc != "stopRouter") and
- (callingProc != "__stop_internal") and
- (callingProc != "stop") and
- (callingProc != "stop_topology") and
- (callingProc != "checkRouterRunning") and
- (callingProc != "check_router_running") and
- (callingProc != "routers_have_failure")):
+ callingProc = sys._getframe(level).f_code.co_name
+ if (
+ (callingProc != "processAddressSanitizerError")
+ and (callingProc != "checkAddressSanitizerError")
+ and (callingProc != "checkRouterCores")
+ and (callingProc != "stopRouter")
+ and (callingProc != "__stop_internal")
+ and (callingProc != "stop")
+ and (callingProc != "stop_topology")
+ and (callingProc != "checkRouterRunning")
+ and (callingProc != "check_router_running")
+ and (callingProc != "routers_have_failure")
+ ):
# Found the calling test
break
- level=level+1
- if (level >= 20):
+ level = level + 1
+ if level >= 20:
# something wrong - couldn't found the calling test function
- callingProc="unknownProc"
+ callingProc = "unknownProc"
with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile:
sys.stderr.write(
"AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n"
@@ -979,7 +985,6 @@ def checkAddressSanitizerError(output, router, component, logdir=""):
addrSanFile.write("\n---------------\n")
return
-
addressSanitizerError = re.search(
"(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ", output
)
@@ -989,16 +994,20 @@ def checkAddressSanitizerError(output, router, component, logdir=""):
# No Address Sanitizer Error in Output. Now check for AddressSanitizer daemon file
if logdir:
- filepattern=logdir+"/"+router+"/"+component+".asan.*"
- logger.debug("Log check for %s on %s, pattern %s\n" % (component, router, filepattern))
+ filepattern = logdir + "/" + router + "/" + component + ".asan.*"
+ logger.debug(
+ "Log check for %s on %s, pattern %s\n" % (component, router, filepattern)
+ )
for file in glob.glob(filepattern):
with open(file, "r") as asanErrorFile:
- asanError=asanErrorFile.read()
+ asanError = asanErrorFile.read()
addressSanitizerError = re.search(
"(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ", asanError
- )
+ )
if addressSanitizerError:
- processAddressSanitizerError(addressSanitizerError, asanError, router, component)
+ processAddressSanitizerError(
+ addressSanitizerError, asanError, router, component
+ )
return True
return False
@@ -1065,7 +1074,7 @@ class Router(Node):
if self.logdir is None:
cur_test = os.environ["PYTEST_CURRENT_TEST"]
self.logdir = "/tmp/topotests/" + cur_test[
- cur_test.find("/")+1 : cur_test.find(".py")
+ cur_test.find("/") + 1 : cur_test.find(".py")
].replace("/", ".")
# If the logdir is not created, then create it and set the
@@ -1073,7 +1082,7 @@ class Router(Node):
if not os.path.isdir(self.logdir):
os.system("mkdir -p " + self.logdir + "/" + name)
os.system("chmod -R go+rw /tmp/topotests")
- # Erase logs of previous run
+ # Erase logs of previous run
os.system("rm -rf " + self.logdir + "/" + name)
self.daemondir = None
diff --git a/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py b/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py
index 6d44d02e5e..2421b312d2 100644
--- a/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py
+++ b/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py
@@ -320,8 +320,10 @@ def test_ospf_link_down_kernel_route():
# Run test function until we get an result. Wait at most 60 seconds.
test_func = partial(compare_show_ip_route_vrf, router.name, expected)
result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5)
- assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down: {}'.format(
- router.name, diff
+ assertmsg = (
+ 'OSPF IPv4 route mismatch in router "{}" after link down: {}'.format(
+ router.name, diff
+ )
)
assert result, assertmsg
diff --git a/tests/topotests/ospf-topo1/test_ospf_topo1.py b/tests/topotests/ospf-topo1/test_ospf_topo1.py
index 95193afb2a..24806dd8fc 100644
--- a/tests/topotests/ospf-topo1/test_ospf_topo1.py
+++ b/tests/topotests/ospf-topo1/test_ospf_topo1.py
@@ -310,7 +310,10 @@ def test_ospf_json():
# r4 has more interfaces for area 0.0.0.1
if router.name == "r4":
expected["areas"]["0.0.0.1"].update(
- {"areaIfActiveCounter": 2, "areaIfTotalCounter": 2,}
+ {
+ "areaIfActiveCounter": 2,
+ "areaIfTotalCounter": 2,
+ }
)
# router 3 has an additional area
@@ -372,16 +375,25 @@ def test_ospf_link_down_kernel_route():
}
if router.name == "r1" or router.name == "r2":
expected.update(
- {"10.0.10.0/24": None, "172.16.0.0/24": None, "172.16.1.0/24": None,}
+ {
+ "10.0.10.0/24": None,
+ "172.16.0.0/24": None,
+ "172.16.1.0/24": None,
+ }
)
elif router.name == "r3" or router.name == "r4":
expected.update(
- {"10.0.1.0/24": None, "10.0.2.0/24": None,}
+ {
+ "10.0.1.0/24": None,
+ "10.0.2.0/24": None,
+ }
)
# Route '10.0.3.0' is no longer available for r4 since it is down.
if router.name == "r4":
expected.update(
- {"10.0.3.0/24": None,}
+ {
+ "10.0.3.0/24": None,
+ }
)
assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format(
router.name
@@ -443,12 +455,17 @@ def test_ospf6_link_down_kernel_route():
)
elif router.name == "r3" or router.name == "r4":
expected.update(
- {"2001:db8:1::/64": None, "2001:db8:2::/64": None,}
+ {
+ "2001:db8:1::/64": None,
+ "2001:db8:2::/64": None,
+ }
)
# Route '2001:db8:3::/64' is no longer available for r4 since it is down.
if router.name == "r4":
expected.update(
- {"2001:db8:3::/64": None,}
+ {
+ "2001:db8:3::/64": None,
+ }
)
assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format(
router.name
diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py
index 3b37b8a92f..5ef6b9b0be 100644
--- a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py
+++ b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py
@@ -252,7 +252,11 @@ def test_ospf_ecmp_tc16_p0(request):
input_dict = {
"r0": {
"static_routes": [
- {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",}
+ {
+ "network": NETWORK["ipv4"][0],
+ "no_of_ip": 5,
+ "next_hop": "Null0",
+ }
]
}
}
@@ -415,7 +419,11 @@ def test_ospf_ecmp_tc17_p0(request):
input_dict = {
"r0": {
"static_routes": [
- {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",}
+ {
+ "network": NETWORK["ipv4"][0],
+ "no_of_ip": 5,
+ "next_hop": "Null0",
+ }
]
}
}
diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py
index 04b1f4b878..88667a6ac8 100644
--- a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py
+++ b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py
@@ -197,6 +197,7 @@ def teardown_module(mod):
# Test cases start here.
# ##################################
+
def test_ospf_routemaps_functionality_tc19_p0(request):
"""
OSPF Route map - Verify OSPF route map support functionality.
@@ -215,121 +216,92 @@ def test_ospf_routemaps_functionality_tc19_p0(request):
"r0": {
"static_routes": [
{
- "network": NETWORK['ipv4'][0],
+ "network": NETWORK["ipv4"][0],
"no_of_ip": 5,
- "next_hop": 'Null0',
+ "next_hop": "Null0",
}
]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- ospf_red_r1 = {
- "r0": {
- "ospf": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
- }
- }
+ ospf_red_r1 = {"r0": {"ospf": {"redistribute": [{"redist_type": "static"}]}}}
result = create_router_ospf(tgen, topo, ospf_red_r1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- dut = 'r1'
- lsid = NETWORK['ipv4'][0].split("/")[0]
- rid = routerids[0]
- protocol = 'ospf'
+ dut = "r1"
+ lsid = NETWORK["ipv4"][0].split("/")[0]
+ rid = routerids[0]
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_red_r1 = {
"r0": {
- "ospf": {
- "redistribute": [{
- "redist_type": "static",
- "del_action": True
- }]
- }
+ "ospf": {"redistribute": [{"redist_type": "static", "del_action": True}]}
}
}
result = create_router_ospf(tgen, topo, ospf_red_r1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step(
- 'Create prefix-list in R0 to permit 10.0.20.1/32 prefix &'
- ' deny 10.0.20.2/32')
+ "Create prefix-list in R0 to permit 10.0.20.1/32 prefix &" " deny 10.0.20.2/32"
+ )
# Create ip prefix list
pfx_list = {
"r0": {
"prefix_lists": {
"ipv4": {
- "pf_list_1_ipv4": [{
- "seqid": 10,
- "network": NETWORK['ipv4'][0],
- "action": "permit"
- },
- {
- "seqid": 11,
- "network": "any",
- "action": "deny"
- }
+ "pf_list_1_ipv4": [
+ {
+ "seqid": 10,
+ "network": NETWORK["ipv4"][0],
+ "action": "permit",
+ },
+ {"seqid": 11, "network": "any", "action": "deny"},
]
}
}
}
}
result = create_prefix_lists(tgen, pfx_list)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
# Create route map
routemaps = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [
+ {
"action": "permit",
- "match": {
- "ipv4": {
- "prefix_lists":
- "pf_list_1_ipv4"
- }
- }
- }]
- }
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ }
+ ]
}
+ }
}
result = create_route_maps(tgen, routemaps)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step(
"Configure route map rmap1 and redistribute static routes to"
- " ospf using route map rmap1")
+ " ospf using route map rmap1"
+ )
ospf_red_r1 = {
"r0": {
"ospf": {
- "redistribute": [{
- "redist_type": "static",
- "route_map": "rmap_ipv4"
- }]
+ "redistribute": [{"redist_type": "static", "route_map": "rmap_ipv4"}]
}
}
}
result = create_router_ospf(tgen, topo, ospf_red_r1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Change prefix rules to permit 10.0.20.2 and deny 10.0.20.1")
# Create ip prefix list
@@ -337,65 +309,54 @@ def test_ospf_routemaps_functionality_tc19_p0(request):
"r0": {
"prefix_lists": {
"ipv4": {
- "pf_list_1_ipv4": [{
- "seqid": 10,
- "network": NETWORK['ipv4'][1],
- "action": "permit"
- },
- {
- "seqid": 11,
- "network": "any",
- "action": "deny"
- }
+ "pf_list_1_ipv4": [
+ {
+ "seqid": 10,
+ "network": NETWORK["ipv4"][1],
+ "action": "permit",
+ },
+ {"seqid": 11, "network": "any", "action": "deny"},
]
}
}
}
}
result = create_prefix_lists(tgen, pfx_list)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that route 10.0.20.2 is allowed and 10.0.20.1 is denied.")
- dut = 'r1'
+ dut = "r1"
input_dict = {
"r0": {
"static_routes": [
- {
- "network": NETWORK['ipv4'][1],
- "no_of_ip": 1,
- "next_hop": 'Null0'
- }
+ {"network": NETWORK["ipv4"][1], "no_of_ip": 1, "next_hop": "Null0"}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
input_dict = {
"r0": {
"static_routes": [
- {
- "network": NETWORK['ipv4'][0],
- "no_of_ip": 1,
- "next_hop": 'Null0'
- }
+ {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0"}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("Delete and reconfigure prefix list.")
# Create ip prefix list
@@ -403,114 +364,101 @@ def test_ospf_routemaps_functionality_tc19_p0(request):
"r0": {
"prefix_lists": {
"ipv4": {
- "pf_list_1_ipv4": [{
- "seqid": 10,
- "network": NETWORK['ipv4'][1],
- "action": "permit",
- "delete": True
- },
- {
- "seqid": 11,
- "network": "any",
- "action": "deny",
- "delete": True
- }
+ "pf_list_1_ipv4": [
+ {
+ "seqid": 10,
+ "network": NETWORK["ipv4"][1],
+ "action": "permit",
+ "delete": True,
+ },
+ {
+ "seqid": 11,
+ "network": "any",
+ "action": "deny",
+ "delete": True,
+ },
]
}
}
}
}
result = create_prefix_lists(tgen, pfx_list)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_prefix_lists(tgen, pfx_list)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
input_dict = {
"r0": {
"static_routes": [
- {
- "network": NETWORK['ipv4'][0],
- "no_of_ip": 5,
- "next_hop": 'Null0'
- }
+ {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0"}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
pfx_list = {
"r0": {
"prefix_lists": {
"ipv4": {
- "pf_list_1_ipv4": [{
- "seqid": 10,
- "network": NETWORK['ipv4'][1],
- "action": "permit"
- },
- {
- "seqid": 11,
- "network": "any",
- "action": "deny"
- }
+ "pf_list_1_ipv4": [
+ {
+ "seqid": 10,
+ "network": NETWORK["ipv4"][1],
+ "action": "permit",
+ },
+ {"seqid": 11, "network": "any", "action": "deny"},
]
}
}
}
}
result = create_prefix_lists(tgen, pfx_list)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that route 10.0.20.2 is allowed and 10.0.20.1 is denied.")
- dut = 'r1'
+ dut = "r1"
input_dict = {
"r0": {
"static_routes": [
- {
- "network": NETWORK['ipv4'][1],
- "no_of_ip": 1,
- "next_hop": 'Null0'
- }
+ {"network": NETWORK["ipv4"][1], "no_of_ip": 1, "next_hop": "Null0"}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
input_dict = {
"r0": {
"static_routes": [
- {
- "network": NETWORK['ipv4'][0],
- "no_of_ip": 1,
- "next_hop": 'Null0'
- }
+ {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0"}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
write_test_footer(tc_name)
@@ -535,7 +483,11 @@ def test_ospf_routemaps_functionality_tc20_p0(request):
input_dict = {
"r0": {
"static_routes": [
- {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",}
+ {
+ "network": NETWORK["ipv4"][0],
+ "no_of_ip": 5,
+ "next_hop": "Null0",
+ }
]
}
}
@@ -663,318 +615,229 @@ def test_ospf_routemaps_functionality_tc21_p0(request):
step(
"Create static routes(10.0.20.1/32) in R1 and redistribute "
- "to OSPF using route map.")
+ "to OSPF using route map."
+ )
# Create Static routes
input_dict = {
"r0": {
"static_routes": [
{
- "network": NETWORK['ipv4'][0],
+ "network": NETWORK["ipv4"][0],
"no_of_ip": 5,
- "next_hop": 'Null0',
+ "next_hop": "Null0",
}
]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_red_r0 = {
"r0": {
"ospf": {
- "redistribute": [{
- "redist_type": "static",
- "route_map": "rmap_ipv4"
- }]
+ "redistribute": [{"redist_type": "static", "route_map": "rmap_ipv4"}]
}
}
}
result = create_router_ospf(tgen, topo, ospf_red_r0)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
# Create route map
routemaps = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- "seq_id": 10
- }]
- }
- }
+ "r0": {"route_maps": {"rmap_ipv4": [{"action": "permit", "seq_id": 10}]}}
}
result = create_route_maps(tgen, routemaps)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that route is advertised to R2.")
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
# Create route map
routemaps = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- "delete": True,
- "seq_id": 10
- }]
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [{"action": "permit", "delete": True, "seq_id": 10}]
+ }
}
}
- }
result = create_route_maps(tgen, routemaps)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- step(' Configure route map with set clause (set metric)')
+ step(" Configure route map with set clause (set metric)")
# Create route map
routemaps = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- "set": {
- "med": 123
- },
- "seq_id": 10
- }]
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [{"action": "permit", "set": {"med": 123}, "seq_id": 10}]
+ }
}
}
- }
result = create_route_maps(tgen, routemaps)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that configured metric is applied to ospf routes.")
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step(
"Configure route map with match clause (match metric) with "
- "some actions(change metric).")
+ "some actions(change metric)."
+ )
# Create route map
routemaps = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- "match": {
- "med": 123
- },
- "set": {
- "med": 150
- },
- "seq_id": 10
- }]
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [
+ {
+ "action": "permit",
+ "match": {"med": 123},
+ "set": {"med": 150},
+ "seq_id": 10,
+ }
+ ]
+ }
}
}
- }
result = create_route_maps(tgen, routemaps)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Configure route map with call clause")
# Create ip prefix list
input_dict_2 = {
- 'r0': {
- 'prefix_lists': {
- 'ipv4': {
- 'pf_list_1_ipv4': [{
- 'seqid': 10,
- 'network': 'any',
- 'action': 'permit'
- }]
- }
+ "r0": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1_ipv4": [
+ {"seqid": 10, "network": "any", "action": "permit"}
+ ]
+ }
}
}
}
result = create_prefix_lists(tgen, input_dict_2)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
# Create route map
input_dict_3 = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
- },
- "set": {
- "med": 150
- },
- "call": "rmap_match_pf_2_ipv4",
- "seq_id": 10
- }],
- "rmap_match_pf_2_ipv4": [{
- "action": "permit",
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
- },
- "set": {
- "med": 200
- },
- "seq_id": 10
- }]
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [
+ {
+ "action": "permit",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 150},
+ "call": "rmap_match_pf_2_ipv4",
+ "seq_id": 10,
+ }
+ ],
+ "rmap_match_pf_2_ipv4": [
+ {
+ "action": "permit",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 200},
+ "seq_id": 10,
+ }
+ ],
+ }
}
}
- }
result = create_route_maps(tgen, input_dict_3)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_ospf_rib(tgen, dut, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
# Create route map
- routemaps = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "delete": True
- }]
- }
- }
- }
+ routemaps = {"r0": {"route_maps": {"rmap_ipv4": [{"delete": True}]}}}
result = create_route_maps(tgen, routemaps)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Configure route map with continue clause")
# Create route map
input_dict_3 = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- 'seq_id': '10',
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
- },
- "set": {
- "med": 150
- },
- "continue": "30",
- "seq_id": 10
- },
- {
- "action": "permit",
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
- },
- "set": {
- "med": 100
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [
+ {
+ "action": "permit",
+ "seq_id": "10",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 150},
+ "continue": "30",
+ "seq_id": 10,
},
- "seq_id": 20
- },
- {
- "action": "permit",
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
+ {
+ "action": "permit",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 100},
+ "seq_id": 20,
},
- "set": {
- "med": 50
+ {
+ "action": "permit",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 50},
+ "seq_id": 30,
},
- "seq_id": 30
- }
- ]
+ ]
+ }
}
}
- }
result = create_route_maps(tgen, input_dict_3)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_ospf_rib(tgen, dut, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Configure route map with goto clause")
# Create route map
input_dict_3 = {
- "r0": {
- "route_maps": {
- "rmap_ipv4": [{
- "action": "permit",
- 'seq_id': '10',
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
+ "r0": {
+ "route_maps": {
+ "rmap_ipv4": [
+ {
+ "action": "permit",
+ "seq_id": "10",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "goto": "30",
},
- "goto": "30",
- },
- {
- "action": "permit",
- 'seq_id': '20',
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
+ {
+ "action": "permit",
+ "seq_id": "20",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 100},
},
- "set": {
- "med": 100
- }
- },
- {
- "action": "permit",
- 'seq_id': '30',
- "match": {
- "ipv4": {
- "prefix_lists": "pf_list_1_ipv4"
- }
+ {
+ "action": "permit",
+ "seq_id": "30",
+ "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}},
+ "set": {"med": 200},
},
- "set": {
- "med": 200
- }
- }
- ]
+ ]
+ }
}
}
- }
result = create_route_maps(tgen, input_dict_3)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
@@ -1003,7 +866,11 @@ def test_ospf_routemaps_functionality_tc24_p0(request):
input_dict = {
"r0": {
"static_routes": [
- {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0",}
+ {
+ "network": NETWORK["ipv4"][0],
+ "no_of_ip": 1,
+ "next_hop": "Null0",
+ }
]
}
}
@@ -1037,9 +904,10 @@ def test_ospf_routemaps_functionality_tc24_p0(request):
step("verify that prefix-list is created in R0.")
result = verify_prefix_lists(tgen, pfx_list)
- assert result is not True, (
- "Testcase {} : Failed \n Prefix list not "
- "present. Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Prefix list not " "present. Error: {}".format(
+ tc_name, result
)
# Create route map
@@ -1105,9 +973,10 @@ def test_ospf_routemaps_functionality_tc24_p0(request):
step("verify that prefix-list is created in R0.")
result = verify_prefix_lists(tgen, pfx_list)
- assert result is not True, (
- "Testcase {} : Failed \n Prefix list not "
- "present. Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Prefix list not " "present. Error: {}".format(
+ tc_name, result
)
# Create route map
diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py
index 6ac0b515df..434d7f8ef5 100644
--- a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py
+++ b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py
@@ -407,7 +407,13 @@ def test_ospf_redistribution_tc6_p0(request):
protocol = "ospf"
result = verify_rib(
- tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh, expected=False,
+ tgen,
+ "ipv4",
+ dut,
+ input_dict,
+ protocol=protocol,
+ next_hop=nh,
+ expected=False,
)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
@@ -549,7 +555,11 @@ def test_ospf_redistribution_tc8_p1(request):
input_dict = {
"r0": {
"static_routes": [
- {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",}
+ {
+ "network": NETWORK["ipv4"][0],
+ "no_of_ip": 5,
+ "next_hop": "Null0",
+ }
]
}
}
diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py b/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py
index b2b0fb8d44..6f6b119abc 100644
--- a/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py
+++ b/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py
@@ -777,7 +777,6 @@ def test_ospf_show_p1(request):
write_test_footer(tc_name)
-
def test_ospf_dead_tc11_p0(request):
"""
OSPF timers.
@@ -799,224 +798,146 @@ def test_ospf_dead_tc11_p0(request):
step("modify dead interval from default value to some other value on r1")
topo1 = {
- 'r1': {
- 'links': {
- 'r0': {
- 'interface': topo['routers']['r1']['links']['r0'][
- 'interface'],
- 'ospf': {
- 'hello_interval': 12,
- 'dead_interval': 48
- }
+ "r1": {
+ "links": {
+ "r0": {
+ "interface": topo["routers"]["r1"]["links"]["r0"]["interface"],
+ "ospf": {"hello_interval": 12, "dead_interval": 48},
}
}
}
}
-
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step(
"verify that new timer value is configured and applied using "
- "the show ip ospf interface command.")
- dut = 'r1'
- input_dict= {
- 'r1': {
- 'links':{
- 'r0': {
- 'ospf':{
- 'timerDeadSecs': 48
- }
- }
- }
- }
- }
+ "the show ip ospf interface command."
+ )
+ dut = "r1"
+ input_dict = {"r1": {"links": {"r0": {"ospf": {"timerDeadSecs": 48}}}}}
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- step(
- "modify dead interval from default value to r1"
- "dead interval timer on r2")
+ step("modify dead interval from default value to r1" "dead interval timer on r2")
topo1 = {
- 'r0': {
- 'links': {
- 'r1': {
- 'interface': topo['routers']['r0']['links']['r1'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 48,
- 'hello_interval': 12
- }
+ "r0": {
+ "links": {
+ "r1": {
+ "interface": topo["routers"]["r0"]["links"]["r1"]["interface"],
+ "ospf": {"dead_interval": 48, "hello_interval": 12},
}
}
}
}
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
-
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("verify that new timer value is configured.")
- input_dict= {
- 'r0': {
- 'links':{
- 'r1': {
- 'ospf':{
- 'timerDeadSecs': 48
- }
- }
- }
- }
- }
- dut = 'r0'
+ input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 48}}}}}
+ dut = "r0"
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("verify that ospf neighbours are full")
ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
- step(
- "reconfigure the default dead interval timer value to "
- "default on r1 and r2")
+ step("reconfigure the default dead interval timer value to " "default on r1 and r2")
topo1 = {
- 'r0': {
- 'links': {
- 'r1': {
- 'interface': topo['routers']['r0']['links']['r1'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 40
- }
+ "r0": {
+ "links": {
+ "r1": {
+ "interface": topo["routers"]["r0"]["links"]["r1"]["interface"],
+ "ospf": {"dead_interval": 40},
}
}
}
}
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
topo1 = {
- 'r1': {
- 'links': {
- 'r0': {
- 'interface': topo['routers']['r1']['links']['r0'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 40
- }
+ "r1": {
+ "links": {
+ "r0": {
+ "interface": topo["routers"]["r1"]["links"]["r0"]["interface"],
+ "ospf": {"dead_interval": 40},
}
}
}
}
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("verify that new timer value is configured.")
- input_dict= {
- 'r0': {
- 'links':{
- 'r1': {
- 'ospf':{
- 'timerDeadSecs': 40
- }
- }
- }
- }
- }
- dut = 'r0'
+ input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 40}}}}}
+ dut = "r0"
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
-
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("verify that ospf neighbours are full")
ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
-
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(" Configure dead timer = 65535 on r1 and r2")
topo1 = {
- 'r0': {
- 'links': {
- 'r1': {
- 'interface': topo['routers']['r0']['links']['r1'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 65535
- }
+ "r0": {
+ "links": {
+ "r1": {
+ "interface": topo["routers"]["r0"]["links"]["r1"]["interface"],
+ "ospf": {"dead_interval": 65535},
}
}
}
}
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
topo1 = {
- 'r1': {
- 'links': {
- 'r0': {
- 'interface': topo['routers']['r1']['links']['r0'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 65535
- }
+ "r1": {
+ "links": {
+ "r0": {
+ "interface": topo["routers"]["r1"]["links"]["r0"]["interface"],
+ "ospf": {"dead_interval": 65535},
}
}
}
}
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("verify that new timer value is configured.")
- input_dict= {
- 'r0': {
- 'links':{
- 'r1': {
- 'ospf':{
- 'timerDeadSecs': 65535
- }
- }
- }
- }
- }
- dut = 'r0'
+ input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 65535}}}}}
+ dut = "r0"
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("verify that ospf neighbours are full")
ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
-
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(" Try configuring timer values outside range for example 65536")
topo1 = {
- 'r0': {
- 'links': {
- 'r1': {
- 'interface': topo['routers']['r0']['links']['r1'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 65536
- }
+ "r0": {
+ "links": {
+ "r1": {
+ "interface": topo["routers"]["r0"]["links"]["r1"]["interface"],
+ "ospf": {"dead_interval": 65536},
}
}
}
@@ -1024,47 +945,33 @@ def test_ospf_dead_tc11_p0(request):
result = create_interfaces_cfg(tgen, topo1)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("Unconfigure the dead timer from the interface from r1 and r2.")
topo1 = {
- 'r1': {
- 'links': {
- 'r0': {
- 'interface': topo['routers']['r1']['links']['r0'][
- 'interface'],
- 'ospf': {
- 'dead_interval': 65535
- },
- 'delete': True
+ "r1": {
+ "links": {
+ "r0": {
+ "interface": topo["routers"]["r1"]["links"]["r0"]["interface"],
+ "ospf": {"dead_interval": 65535},
+ "delete": True,
}
}
}
}
result = create_interfaces_cfg(tgen, topo1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step(
- "Verify that timer value is deleted from intf & "
- "set to default value 40 sec.")
- input_dict= {
- 'r1': {
- 'links':{
- 'r0': {
- 'ospf':{
- 'timerDeadSecs': 40
- }
- }
- }
- }
- }
- dut = 'r1'
+ "Verify that timer value is deleted from intf & " "set to default value 40 sec."
+ )
+ input_dict = {"r1": {"links": {"r0": {"ospf": {"timerDeadSecs": 40}}}}}
+ dut = "r1"
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
diff --git a/tests/topotests/rip-topo1/test_rip_topo1.py b/tests/topotests/rip-topo1/test_rip_topo1.py
index de11b78824..fdafa50aba 100644
--- a/tests/topotests/rip-topo1/test_rip_topo1.py
+++ b/tests/topotests/rip-topo1/test_rip_topo1.py
@@ -352,9 +352,11 @@ def test_zebra_ipv4_routingTable():
else:
print("r%s ok" % i)
- assert failures == 0, (
- "Zebra IPv4 Routing Table verification failed for router r%s:\n%s"
- % (i, diff)
+ assert (
+ failures == 0
+ ), "Zebra IPv4 Routing Table verification failed for router r%s:\n%s" % (
+ i,
+ diff,
)
# Make sure that all daemons are still running
diff --git a/tests/topotests/ripng-topo1/test_ripng_topo1.py b/tests/topotests/ripng-topo1/test_ripng_topo1.py
index 2976cdefe4..4702d33dae 100644
--- a/tests/topotests/ripng-topo1/test_ripng_topo1.py
+++ b/tests/topotests/ripng-topo1/test_ripng_topo1.py
@@ -373,9 +373,11 @@ def test_zebra_ipv6_routingTable():
else:
print("r%s ok" % i)
- assert failures == 0, (
- "Zebra IPv6 Routing Table verification failed for router r%s:\n%s"
- % (i, diff)
+ assert (
+ failures == 0
+ ), "Zebra IPv6 Routing Table verification failed for router r%s:\n%s" % (
+ i,
+ diff,
)
# Make sure that all daemons are running
diff --git a/tests/topotests/zebra_netlink/test_zebra_netlink.py b/tests/topotests/zebra_netlink/test_zebra_netlink.py
index 4da5303641..94baf8438f 100644
--- a/tests/topotests/zebra_netlink/test_zebra_netlink.py
+++ b/tests/topotests/zebra_netlink/test_zebra_netlink.py
@@ -118,7 +118,12 @@ def test_zebra_netlink_batching():
r1.vtysh_cmd("sharp install routes 2.1.3.7 nexthop 192.168.1.1 100")
json_file = "{}/r1/v4_route.json".format(CWD)
expected = json.loads(open(json_file).read())
- test_func = partial(topotest.router_json_cmp, r1, "show ip route json", expected,)
+ test_func = partial(
+ topotest.router_json_cmp,
+ r1,
+ "show ip route json",
+ expected,
+ )
_, result = topotest.run_and_expect(test_func, None, count=2, wait=0.5)
assertmsg = '"r1" JSON output mismatches'
assert result is None, assertmsg