From 9fa6ec14737b94fdfb41539d96c7e4f84f3514b6 Mon Sep 17 00:00:00 2001 From: whitespace Date: Thu, 17 Dec 2020 15:32:11 -0500 Subject: [PATCH] tests: please follow the style guide thanks Signed-off-by: Quentin Young --- .../test_all_protocol_startup.py | 24 +- .../test_bfd_bgp_cbit_topo3.py | 3 +- .../test_bgp_basic_functionality.py | 30 +- .../bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py | 26 +- .../bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py | 26 +- tests/topotests/bgp-evpn-mh/test_evpn_mh.py | 21 +- .../test_bgp_evpn_vxlan.py | 6 +- .../test_bgp_path_attributes.py | 7 +- .../bgp-route-map/test_route_map_topo1.py | 33 +- .../bgp-route-map/test_route_map_topo2.py | 64 +- .../test_bgp-vrf-route-leak-basic.py | 18 +- .../bgp_features/test_bgp_features.py | 254 ++++++-- .../test_bgp_gr_functionality_topo1.py | 158 ++++- .../test_bgp_gr_functionality_topo2.py | 31 +- .../test_bgp_large_community_topo_2.py | 12 +- .../test_bgp_multi_vrf_topo2.py | 38 +- .../bgp_prefix_sid/test_bgp_prefix_sid.py | 6 +- ...test_bgp_recursive_route_ebgp_multi_hop.py | 53 +- .../test_bgp_aggregation.py | 17 +- .../bgp_suppress_fib/test_bgp_suppress_fib.py | 2 + .../test_bgp_vrf_dynamic_route_leak_topo2.py | 7 +- .../isis-lfa-topo1/test_isis_lfa_topo1.py | 343 ++++++---- .../isis-tilfa-topo1/test_isis_tilfa_topo1.py | 378 ++++++----- tests/topotests/ldp-topo1/test_ldp_topo1.py | 8 +- tests/topotests/lib/common_config.py | 27 +- tests/topotests/lib/ospf.py | 12 +- tests/topotests/lib/test/test_json.py | 229 ++++++- tests/topotests/lib/topogen.py | 4 +- tests/topotests/lib/topotest.py | 75 ++- .../ospf-topo1-vrf/test_ospf_topo1_vrf.py | 6 +- tests/topotests/ospf-topo1/test_ospf_topo1.py | 29 +- .../test_ospf_ecmp.py | 12 +- .../test_ospf_routemaps.py | 609 +++++++----------- .../test_ospf_rte_calc.py | 14 +- .../test_ospf_single_area.py | 253 +++----- tests/topotests/rip-topo1/test_rip_topo1.py | 8 +- .../topotests/ripng-topo1/test_ripng_topo1.py | 8 +- .../zebra_netlink/test_zebra_netlink.py | 7 +- 38 files changed, 1752 insertions(+), 1106 deletions(-) diff --git a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py index 08378d9b58..ab9358408e 100644 --- a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py +++ b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py @@ -1016,6 +1016,7 @@ def test_bgp_ipv6_summary(): # For debugging after starting FRR daemons, uncomment the next line # CLI(net) + def test_nht(): print("\n\n**** Test that nexthop tracking is at least nominally working ****\n") @@ -1026,13 +1027,16 @@ def test_nht(): expected = open(nhtFile).read().rstrip() expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) - actual = (net["r%s" %i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip()) + actual = net["r%s" % i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip() actual = re.sub(r"fd [0-9][0-9]", "fd XX", actual) actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) - diff = topotest.get_textdiff(actual, expected, - title1="Actual `show ip nht`", - title2="Expected `show ip nht`") + diff = topotest.get_textdiff( + actual, + expected, + title1="Actual `show ip nht`", + title2="Expected `show ip nht`", + ) if diff: assert 0, "r%s failed ip nht check:\n%s\n" % (i, diff) @@ -1043,19 +1047,23 @@ def test_nht(): expected = open(nhtFile).read().rstrip() expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) - actual = (net["r%s" %i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip()) + actual = net["r%s" % i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip() actual = re.sub(r"fd [0-9][0-9]", "fd XX", actual) actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) - diff = topotest.get_textdiff(actual, expected, - title1="Actual `show ip nht`", - title2="Expected `show ip nht`") + diff = topotest.get_textdiff( + actual, + expected, + title1="Actual `show ip nht`", + title2="Expected `show ip nht`", + ) if diff: assert 0, "r%s failed ipv6 nht check:\n%s\n" % (i, diff) else: print("show ipv6 nht is ok\n") + def test_bgp_ipv4(): global fatal_error global net diff --git a/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py b/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py index 595132214b..98a5033f53 100644 --- a/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py +++ b/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py @@ -74,7 +74,8 @@ def setup_module(mod): for rname, router in router_list.items(): router.load_config( - TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)), + TopoRouter.RD_ZEBRA, + os.path.join(CWD, "{}/zebra.conf".format(rname)), ) router.load_config( TopoRouter.RD_BFD, os.path.join(CWD, "{}/bfdd.conf".format(rname)) diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py index b3b7256ac4..b701a0d61e 100644 --- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py +++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py @@ -282,10 +282,26 @@ def test_BGP_config_with_invalid_ASN_p2(request): # Api call to modify AS number input_dict = { - "r1": {"bgp": {"local_as": 0,}}, - "r2": {"bgp": {"local_as": 0,}}, - "r3": {"bgp": {"local_as": 0,}}, - "r4": {"bgp": {"local_as": 64000,}}, + "r1": { + "bgp": { + "local_as": 0, + } + }, + "r2": { + "bgp": { + "local_as": 0, + } + }, + "r3": { + "bgp": { + "local_as": 0, + } + }, + "r4": { + "bgp": { + "local_as": 64000, + } + }, } result = modify_as_number(tgen, topo, input_dict) try: @@ -819,7 +835,11 @@ def test_bgp_with_loopback_interface(request): # Adding ['source_link'] = 'lo' key:value pair topo["routers"][routerN]["bgp"]["address_family"]["ipv4"]["unicast"][ "neighbor" - ][bgp_neighbor]["dest_link"] = {"lo": {"source_link": "lo",}} + ][bgp_neighbor]["dest_link"] = { + "lo": { + "source_link": "lo", + } + } # Creating configuration from JSON build_config_from_json(tgen, topo) diff --git a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py index 54a3c699f3..a7cce25f7f 100644 --- a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py +++ b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py @@ -273,8 +273,20 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type): "r3": { "bgp": { "address_family": { - "ipv4": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}}, - "ipv6": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}}, + "ipv4": { + "unicast": { + "maximum_paths": { + "ebgp": ecmp_num, + } + } + }, + "ipv6": { + "unicast": { + "maximum_paths": { + "ebgp": ecmp_num, + } + } + }, } } } @@ -303,7 +315,7 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type): input_dict_1, next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)], protocol=protocol, - count_only=True + count_only=True, ) assert result is True, "Testcase {} : Failed \n Error: {}".format( @@ -371,8 +383,8 @@ def test_ecmp_after_clear_bgp(request, test_type): def test_ecmp_remove_redistribute_static(request): - """ Verify routes are cleared from BGP and RIB table of DUT when - redistribute static configuration is removed.""" + """Verify routes are cleared from BGP and RIB table of DUT when + redistribute static configuration is removed.""" tc_name = request.node.name write_test_header(tc_name) @@ -481,8 +493,8 @@ def test_ecmp_remove_redistribute_static(request): @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"]) def test_ecmp_shut_bgp_neighbor(request, test_type): - """ Shut BGP neigbors one by one and verify BGP and routing table updated - accordingly in DUT """ + """Shut BGP neigbors one by one and verify BGP and routing table updated + accordingly in DUT""" tc_name = request.node.name write_test_header(tc_name) diff --git a/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py index 73724ac069..893c5642dc 100644 --- a/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py +++ b/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py @@ -274,8 +274,20 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type): "r3": { "bgp": { "address_family": { - "ipv4": {"unicast": {"maximum_paths": {"ibgp": ecmp_num,}}}, - "ipv6": {"unicast": {"maximum_paths": {"ibgp": ecmp_num,}}}, + "ipv4": { + "unicast": { + "maximum_paths": { + "ibgp": ecmp_num, + } + } + }, + "ipv6": { + "unicast": { + "maximum_paths": { + "ibgp": ecmp_num, + } + } + }, } } } @@ -304,7 +316,7 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type): input_dict_1, next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)], protocol=protocol, - count_only=True + count_only=True, ) assert result is True, "Testcase {} : Failed \n Error: {}".format( @@ -372,8 +384,8 @@ def test_ecmp_after_clear_bgp(request, test_type): def test_ecmp_remove_redistribute_static(request): - """ Verify routes are cleared from BGP and RIB table of DUT when - redistribute static configuration is removed.""" + """Verify routes are cleared from BGP and RIB table of DUT when + redistribute static configuration is removed.""" tc_name = request.node.name write_test_header(tc_name) @@ -482,8 +494,8 @@ def test_ecmp_remove_redistribute_static(request): @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"]) def test_ecmp_shut_bgp_neighbor(request, test_type): - """ Shut BGP neigbors one by one and verify BGP and routing table updated - accordingly in DUT """ + """Shut BGP neigbors one by one and verify BGP and routing table updated + accordingly in DUT""" tc_name = request.node.name write_test_header(tc_name) diff --git a/tests/topotests/bgp-evpn-mh/test_evpn_mh.py b/tests/topotests/bgp-evpn-mh/test_evpn_mh.py index 4c56d1a02d..6a24684649 100644 --- a/tests/topotests/bgp-evpn-mh/test_evpn_mh.py +++ b/tests/topotests/bgp-evpn-mh/test_evpn_mh.py @@ -658,10 +658,11 @@ def test_evpn_mac(): assertmsg = '"{}" remote MAC content incorrect'.format(tor.name) assert result is None, assertmsg + def check_df_role(dut, esi, role): - ''' + """ Return error string if the df role on the dut is different - ''' + """ es_json = dut.vtysh_cmd("show evpn es %s json" % esi) es = json.loads(es_json) @@ -676,12 +677,13 @@ def check_df_role(dut, esi, role): return None + def test_evpn_df(): - ''' + """ 1. Check the DF role on all the PEs on rack-1. 2. Increase the DF preference on the non-DF and check if it becomes the DF winner. - ''' + """ tgen = get_topogen() @@ -720,10 +722,11 @@ def test_evpn_df(): # tgen.mininet_cli() + def check_protodown_rc(dut, protodown_rc): - ''' + """ check if specified protodown reason code is set - ''' + """ out = dut.vtysh_cmd("show evpn json") @@ -739,13 +742,14 @@ def check_protodown_rc(dut, protodown_rc): return None + def test_evpn_uplink_tracking(): - ''' + """ 1. Wait for access ports to come out of startup-delay 2. disable uplinks and check if access ports have been protodowned 3. enable uplinks and check if access ports have been moved out of protodown - ''' + """ tgen = get_topogen() @@ -778,6 +782,7 @@ def test_evpn_uplink_tracking(): assertmsg = '"{}" protodown rc incorrect'.format(dut_name) assert result is None, assertmsg + if __name__ == "__main__": args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py b/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py index 5098808d55..9a38158b2b 100755 --- a/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py +++ b/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py @@ -310,8 +310,10 @@ def ip_learn_test(tgen, host, local, remote, ip_addr): assertmsg = "local learned mac wrong type: {} ".format(mac_type) assert mac_type == "local", assertmsg - assertmsg = "learned address mismatch with configured address host: {} learned: {}".format( - ip_addr, learned_ip + assertmsg = ( + "learned address mismatch with configured address host: {} learned: {}".format( + ip_addr, learned_ip + ) ) assert ip_addr == learned_ip, assertmsg diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py index 607b036c6a..a9541a55c5 100644 --- a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py +++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py @@ -238,9 +238,10 @@ def test_next_hop_attribute(request): result = verify_rib( tgen, addr_type, dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ( - "Testcase {} : Failed \n Error: " - "{} routes are not present in RIB".format(addr_type, tc_name) + assert ( + result is not True + ), "Testcase {} : Failed \n Error: " "{} routes are not present in RIB".format( + addr_type, tc_name ) # Configure next-hop-self to bgp neighbor diff --git a/tests/topotests/bgp-route-map/test_route_map_topo1.py b/tests/topotests/bgp-route-map/test_route_map_topo1.py index 1aa951edaa..0158e24d31 100644 --- a/tests/topotests/bgp-route-map/test_route_map_topo1.py +++ b/tests/topotests/bgp-route-map/test_route_map_topo1.py @@ -807,7 +807,9 @@ def test_route_map_multiple_seq_different_match_set_clause_p0(request): "prefix_lists": "pf_list_2_{}".format(addr_type) } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, }, { "action": "permit", @@ -906,7 +908,17 @@ def test_route_map_multiple_seq_different_match_set_clause_p0(request): dut = "r3" protocol = "bgp" input_dict = { - "r3": {"route_maps": {"rmap_match_pf_list1": [{"set": {"metric": 50,}}],}} + "r3": { + "route_maps": { + "rmap_match_pf_list1": [ + { + "set": { + "metric": 50, + } + } + ], + } + } } static_routes = [NETWORK[adt][0]] @@ -1093,7 +1105,14 @@ def test_route_map_set_only_no_match_p0(request): input_dict_4 = { "r3": { "route_maps": { - "rmap_match_pf_1": [{"action": "permit", "set": {"metric": 50,}}] + "rmap_match_pf_1": [ + { + "action": "permit", + "set": { + "metric": 50, + }, + } + ] } } } @@ -1210,7 +1229,13 @@ def test_route_map_match_only_no_set_p0(request): "r1": { "route_maps": { "rmap_match_pf_1_{}".format(addr_type): [ - {"action": "permit", "set": {"metric": 50, "locPrf": 150,}} + { + "action": "permit", + "set": { + "metric": 50, + "locPrf": 150, + }, + } ] } } diff --git a/tests/topotests/bgp-route-map/test_route_map_topo2.py b/tests/topotests/bgp-route-map/test_route_map_topo2.py index 3056aa29f3..958eceba62 100644 --- a/tests/topotests/bgp-route-map/test_route_map_topo2.py +++ b/tests/topotests/bgp-route-map/test_route_map_topo2.py @@ -268,12 +268,20 @@ def test_rmap_match_prefix_list_permit_in_and_outbound_prefixes_p0(): "prefix_lists": { "ipv4": { "pf_list_1_ipv4": [ - {"seqid": 10, "network": "any", "action": "permit",} + { + "seqid": 10, + "network": "any", + "action": "permit", + } ] }, "ipv6": { "pf_list_1_ipv6": [ - {"seqid": 10, "network": "any", "action": "permit",} + { + "seqid": 10, + "network": "any", + "action": "permit", + } ] }, } @@ -472,7 +480,11 @@ def test_modify_set_match_clauses_in_rmap_p0(): "prefix_lists": { "ipv4": { "pf_list_1_ipv4": [ - {"seqid": 10, "network": "any", "action": "permit",} + { + "seqid": 10, + "network": "any", + "action": "permit", + } ], "pf_list_2_ipv4": [ {"seqid": 10, "network": "any", "action": "permit"} @@ -480,7 +492,11 @@ def test_modify_set_match_clauses_in_rmap_p0(): }, "ipv6": { "pf_list_1_ipv6": [ - {"seqid": 10, "network": "any", "action": "permit",} + { + "seqid": 10, + "network": "any", + "action": "permit", + } ], "pf_list_2_ipv6": [ {"seqid": 10, "network": "any", "action": "permit"} @@ -506,7 +522,9 @@ def test_modify_set_match_clauses_in_rmap_p0(): "prefix_lists": "pf_list_1_{}".format(addr_type) } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, } ], "rmap_match_pf_2_{}".format(addr_type): [ @@ -666,7 +684,9 @@ def test_modify_set_match_clauses_in_rmap_p0(): "prefix_lists": "pf_list_1_{}".format(addr_type) } }, - "set": {"locPrf": 1000,}, + "set": { + "locPrf": 1000, + }, } ], "rmap_match_pf_2_{}".format(addr_type): [ @@ -816,12 +836,20 @@ def test_modify_prefix_list_referenced_by_rmap_p0(): "prefix_lists": { "ipv4": { "pf_list_1_ipv4": [ - {"seqid": 10, "network": "any", "action": "permit",} + { + "seqid": 10, + "network": "any", + "action": "permit", + } ] }, "ipv6": { "pf_list_1_ipv6": [ - {"seqid": 100, "network": "any", "action": "permit",} + { + "seqid": 100, + "network": "any", + "action": "permit", + } ] }, } @@ -1090,7 +1118,9 @@ def test_remove_prefix_list_referenced_by_rmap_p0(): "prefix_lists": "pf_list_1_{}".format(addr_type) } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, } ], "rmap_match_pf_2_{}".format(addr_type): [ @@ -1894,7 +1924,9 @@ def test_multiple_match_statement_in_route_map_logical_ANDed_p1(): "prefix_lists": "pf_list_1_{}".format(addr_type) } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, } ] } @@ -1921,7 +1953,9 @@ def test_multiple_match_statement_in_route_map_logical_ANDed_p1(): } } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, } ] } @@ -2048,7 +2082,9 @@ def test_add_remove_rmap_to_specific_neighbor_p0(): "prefix_lists": "pf_list_1_{}".format(addr_type) } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, } ] } @@ -3505,7 +3541,9 @@ def test_create_rmap_match_prefix_list_to_deny_in_and_outbound_prefixes_p0(): "prefix_lists": "pf_list_1_{}".format(addr_type) } }, - "set": {"locPrf": 150,}, + "set": { + "locPrf": 150, + }, } ], "rmap_match_pf_2_{}".format(addr_type): [ diff --git a/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py b/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py index 36f1d8cd56..71f64e9b70 100644 --- a/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py +++ b/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py @@ -90,7 +90,11 @@ def test_vrf_route_leak(): # Test DONNA VRF. expect = { - "10.0.0.0/24": [{"protocol": "connected",}], + "10.0.0.0/24": [ + { + "protocol": "connected", + } + ], "10.0.1.0/24": [ {"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]} ], @@ -111,11 +115,19 @@ def test_vrf_route_leak(): "10.0.0.0/24": [ {"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]} ], - "10.0.1.0/24": [{"protocol": "connected",}], + "10.0.1.0/24": [ + { + "protocol": "connected", + } + ], "10.0.2.0/24": [ {"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]} ], - "10.0.3.0/24": [{"protocol": "connected",}], + "10.0.3.0/24": [ + { + "protocol": "connected", + } + ], } test_func = partial( diff --git a/tests/topotests/bgp_features/test_bgp_features.py b/tests/topotests/bgp_features/test_bgp_features.py index bc821bd7a0..5f3809c2b3 100644 --- a/tests/topotests/bgp_features/test_bgp_features.py +++ b/tests/topotests/bgp_features/test_bgp_features.py @@ -753,41 +753,71 @@ def test_bgp_delayopen_without(): pytest.skip(tgen.errors) # part 1: no delay r1 <=> no delay r4 - logger.info("Starting optional test of BGP functionality without DelayOpenTimer enabled to establish a reference for following tests") + logger.info( + "Starting optional test of BGP functionality without DelayOpenTimer enabled to establish a reference for following tests" + ) # 1.1 enable peering shutdown logger.info("Enable shutdown of peering between r1 and r4") - tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"') - tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"') + tgen.net["r1"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"' + ) + tgen.net["r4"].cmd( + 'vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"' + ) # 1.2 wait for peers to shut down (poll output) for router_num in [1, 4]: - logger.info("Checking BGP summary after enabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after enabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) assertmsg = "BGP session on r{} did not shut down peer".format(router_num) assert res is None, assertmsg # 1.3 disable peering shutdown logger.info("Disable shutdown of peering between r1 and r4") - tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"') - tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"') + tgen.net["r1"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"' + ) + tgen.net["r4"].cmd( + 'vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"' + ) # 1.4 wait for peers to establish connection (poll output) for router_num in [1, 4]: - logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after disabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=5, wait=1) - assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num) + assertmsg = ( + "BGP session on r{} did not establish a connection with peer".format( + router_num + ) + ) assert res is None, assertmsg - #tgen.mininet_cli() + # tgen.mininet_cli() # end test_bgp_delayopen_without @@ -800,65 +830,107 @@ def test_bgp_delayopen_singular(): pytest.skip(tgen.errors) # part 2: delay 240s r1 <=> no delay r4 - logger.info("Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on one side of the peering") + logger.info( + "Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on one side of the peering" + ) # 2.1 enable peering shutdown logger.info("Enable shutdown of peering between r1 and r4") - tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"') - tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"') + tgen.net["r1"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"' + ) + tgen.net["r4"].cmd( + 'vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"' + ) # 2.2 wait for peers to shut down (poll output) for router_num in [1, 4]: - logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after disabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) assertmsg = "BGP session on r{} did not shut down peer".format(router_num) assert res is None, assertmsg # 2.3 set delayopen on R1 to 240 logger.info("Setting DelayOpenTime for neighbor r4 to 240 seconds on r1") - tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 timers delayopen 240"') + tgen.net["r1"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 timers delayopen 240"' + ) # 2.4 check config (poll output) logger.info("Checking BGP neighbor configuration after setting DelayOpenTime on r1") router = tgen.gears["r1"] reffile = os.path.join(CWD, "r1/bgp_delayopen_neighbor.json") expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show bgp neighbors json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show bgp neighbors json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) assertmsg = "BGP session on r1 failed to set DelayOpenTime for r4" assert res is None, assertmsg # 2.5 disable peering shutdown logger.info("Disable shutdown of peering between r1 and r4") - tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"') - tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"') + tgen.net["r1"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"' + ) + tgen.net["r4"].cmd( + 'vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"' + ) # 2.6 wait for peers to establish connection (poll output) for router_num in [1, 4]: - logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after disabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=5, wait=1) - assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num) + assertmsg = ( + "BGP session on r{} did not establish a connection with peer".format( + router_num + ) + ) assert res is None, assertmsg # 2.7 unset delayopen on R1 logger.info("Disabling DelayOpenTimer for neighbor r4 on r1") - tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 timers delayopen"') + tgen.net["r1"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 timers delayopen"' + ) # 2.8 check config (poll output) - logger.info("Checking BGP neighbor configuration after disabling DelayOpenTimer on r1") - delayopen_cfg = tgen.net["r1"].cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"').rstrip() + logger.info( + "Checking BGP neighbor configuration after disabling DelayOpenTimer on r1" + ) + delayopen_cfg = ( + tgen.net["r1"] + .cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"') + .rstrip() + ) assertmsg = "BGP session on r1 failed disable DelayOpenTimer for peer r4" assert delayopen_cfg == "", assertmsg - #tgen.mininet_cli() + # tgen.mininet_cli() # end test_bgp_delayopen_singular @@ -870,67 +942,119 @@ def test_bgp_delayopen_dual(): pytest.skip(tgen.errors) # part 3: delay 60s R2 <=> delay 30s R5 - logger.info("Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on both sides of the peering with different timer intervals") + logger.info( + "Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on both sides of the peering with different timer intervals" + ) # 3.1 enable peering shutdown logger.info("Enable shutdown of peering between r2 and r5") - tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 shutdown"') - tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 shutdown"') + tgen.net["r2"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 shutdown"' + ) + tgen.net["r5"].cmd( + 'vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 shutdown"' + ) # 3.2 wait for peers to shut down (pool output) for router_num in [2, 5]: - logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after disabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) assertmsg = "BGP session on r{} did not shut down peer".format(router_num) assert res is None, assertmsg # 3.3 set delayopen on R2 to 60s and on R5 to 30s logger.info("Setting DelayOpenTime for neighbor r5 to 60 seconds on r2") - tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 timers delayopen 60"') + tgen.net["r2"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 timers delayopen 60"' + ) logger.info("Setting DelayOpenTime for neighbor r2 to 30 seconds on r5") - tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 timers delayopen 30"') + tgen.net["r5"].cmd( + 'vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 timers delayopen 30"' + ) # 3.4 check config (poll output) for router_num in [2, 5]: - logger.info("Checking BGP neighbor configuration after setting DelayOpenTime on r{}i".format(router_num)) + logger.info( + "Checking BGP neighbor configuration after setting DelayOpenTime on r{}i".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_neighbor.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_neighbor.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show bgp neighbors json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show bgp neighbors json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) assertmsg = "BGP session on r{} failed to set DelayOpenTime".format(router_num) assert res is None, assertmsg ## 3.5 disable peering shutdown logger.info("Disable shutdown of peering between r2 and r5") - tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 shutdown"') - tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 shutdown"') + tgen.net["r2"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 shutdown"' + ) + tgen.net["r5"].cmd( + 'vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 shutdown"' + ) ## 3.6 wait for peers to reach connect or active state (poll output) delay_start = int(time.time()) for router_num in [2, 5]: - logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after disabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_connect.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_connect.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) - assertmsg = "BGP session on r{} did not enter Connect state with peer".format(router_num) + assertmsg = "BGP session on r{} did not enter Connect state with peer".format( + router_num + ) assert res is None, assertmsg ## 3.7 wait for peers to establish connection (poll output) for router_num in [2, 5]: - logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) + logger.info( + "Checking BGP summary after disabling shutdown of peering on r{}".format( + router_num + ) + ) router = tgen.gears["r{}".format(router_num)] - reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)) + reffile = os.path.join( + CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num) + ) expected = json.loads(open(reffile).read()) - test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) + test_func = functools.partial( + topotest.router_json_cmp, router, "show ip bgp summary json", expected + ) _, res = topotest.run_and_expect(test_func, None, count=35, wait=1) - assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num) + assertmsg = ( + "BGP session on r{} did not establish a connection with peer".format( + router_num + ) + ) assert res is None, assertmsg delay_stop = int(time.time()) @@ -939,18 +1063,32 @@ def test_bgp_delayopen_dual(): # 3.8 unset delayopen on R2 and R5 logger.info("Disabling DelayOpenTimer for neighbor r5 on r2") - tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 timers delayopen"') + tgen.net["r2"].cmd( + 'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 timers delayopen"' + ) logger.info("Disabling DelayOpenTimer for neighbor r2 on r5") - tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 timers delayopen"') + tgen.net["r5"].cmd( + 'vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 timers delayopen"' + ) # 3.9 check config (poll output) for router_num in [2, 5]: - logger.info("Checking BGP neighbor configuration after disabling DelayOpenTimer on r{}".format(router_num)) - delayopen_cfg = tgen.net["r{}".format(router_num)].cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"').rstrip() - assertmsg = "BGP session on r{} failed disable DelayOpenTimer".format(router_num) + logger.info( + "Checking BGP neighbor configuration after disabling DelayOpenTimer on r{}".format( + router_num + ) + ) + delayopen_cfg = ( + tgen.net["r{}".format(router_num)] + .cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"') + .rstrip() + ) + assertmsg = "BGP session on r{} failed disable DelayOpenTimer".format( + router_num + ) assert delayopen_cfg == "", assertmsg - #tgen.mininet_cli() + # tgen.mininet_cli() # end test_bgp_delayopen_dual diff --git a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py index 097b654e77..94fd17e012 100644 --- a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py +++ b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py @@ -316,7 +316,9 @@ def test_BGP_GR_TC_46_p1(request): input_dict = { "r1": { "bgp": { - "graceful-restart": {"graceful-restart": True,}, + "graceful-restart": { + "graceful-restart": True, + }, "address_family": { "ipv4": { "unicast": { @@ -1184,8 +1186,8 @@ def test_BGP_GR_TC_53_p1(request): def test_BGP_GR_TC_4_p0(request): """ - Test Objective : Verify that the restarting node sets "R" bit while sending the - BGP open messages after the node restart, only if GR is enabled. + Test Objective : Verify that the restarting node sets "R" bit while sending the + BGP open messages after the node restart, only if GR is enabled. """ tgen = get_topogen() @@ -1368,9 +1370,9 @@ def test_BGP_GR_TC_4_p0(request): def test_BGP_GR_TC_5_1_2_p1(request): """ - Test Objective : Verify if restarting node resets R bit in BGP open message - during normal BGP session flaps as well, even when GR restarting mode is enabled. - Here link flap happen due to interface UP/DOWN. + Test Objective : Verify if restarting node resets R bit in BGP open message + during normal BGP session flaps as well, even when GR restarting mode is enabled. + Here link flap happen due to interface UP/DOWN. """ tgen = get_topogen() @@ -1815,8 +1817,8 @@ def test_BGP_GR_TC_6_1_2_p1(request): def test_BGP_GR_TC_8_p1(request): """ - Test Objective : Verify that restarting nodes set "F" bit while sending - the BGP open messages after it restarts, only when BGP GR is enabled. + Test Objective : Verify that restarting nodes set "F" bit while sending + the BGP open messages after it restarts, only when BGP GR is enabled. """ tgen = get_topogen() @@ -1959,8 +1961,8 @@ def test_BGP_GR_TC_8_p1(request): def test_BGP_GR_TC_17_p1(request): """ - Test Objective : Verify that only GR helper routers keep the stale - route entries, not any GR disabled router. + Test Objective : Verify that only GR helper routers keep the stale + route entries, not any GR disabled router. """ tgen = get_topogen() @@ -2145,8 +2147,8 @@ def test_BGP_GR_TC_17_p1(request): def test_BGP_GR_TC_19_p1(request): """ - Test Objective : Verify that GR helper routers keeps all the routes received - from restarting node if both the routers are configured as GR restarting node. + Test Objective : Verify that GR helper routers keeps all the routes received + from restarting node if both the routers are configured as GR restarting node. """ tgen = get_topogen() @@ -2325,8 +2327,8 @@ def test_BGP_GR_TC_19_p1(request): def test_BGP_GR_TC_20_p1(request): """ - Test Objective : Verify that GR helper routers delete all the routes - received from a node if both the routers are configured as GR helper node. + Test Objective : Verify that GR helper routers delete all the routes + received from a node if both the routers are configured as GR helper node. """ tgen = get_topogen() tc_name = request.node.name @@ -3090,8 +3092,8 @@ def test_BGP_GR_TC_31_2_p1(request): def test_BGP_GR_TC_9_p1(request): """ - Test Objective : Verify that restarting nodes reset "F" bit while sending - the BGP open messages after it's restarts, when BGP GR is **NOT** enabled. + Test Objective : Verify that restarting nodes reset "F" bit while sending + the BGP open messages after it's restarts, when BGP GR is **NOT** enabled. """ tgen = get_topogen() @@ -3264,8 +3266,8 @@ def test_BGP_GR_TC_9_p1(request): def test_BGP_GR_TC_17_p1(request): """ - Test Objective : Verify that only GR helper routers keep the stale - route entries, not any GR disabled router. + Test Objective : Verify that only GR helper routers keep the stale + route entries, not any GR disabled router. """ tgen = get_topogen() @@ -3467,7 +3469,13 @@ def test_BGP_GR_TC_43_p1(request): step("Configure R1 and R2 as GR restarting node in global level") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -3560,7 +3568,13 @@ def test_BGP_GR_TC_43_p1(request): step("Verify on R2 that R1 doesn't advertise any GR capabilities") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart-disable": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -3659,7 +3673,13 @@ def test_BGP_GR_TC_43_p1(request): step("Verify on R2 that R1 advertises GR capabilities as a restarting node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -3779,7 +3799,13 @@ def test_BGP_GR_TC_44_p1(request): step("Verify on R2 that R1 advertises GR capabilities as a helper node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart-helper": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -3849,7 +3875,13 @@ def test_BGP_GR_TC_44_p1(request): start_router_daemons(tgen, "r2", ["bgpd"]) input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}} + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart-disable": True, + } + } + } } configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") @@ -3857,7 +3889,13 @@ def test_BGP_GR_TC_44_p1(request): step("Verify on R2 that R1 doesn't advertise any GR capabilities") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart-disable": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -3941,7 +3979,13 @@ def test_BGP_GR_TC_44_p1(request): step("Verify on R2 that R1 advertises GR capabilities as a helper node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart-helper": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -4108,14 +4152,28 @@ def test_BGP_GR_TC_45_p1(request): start_router_daemons(tgen, "r1", ["bgpd"]) - input_dict = {"r1": {"bgp": {"graceful-restart": {"graceful-restart": False,}}}} + input_dict = { + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": False, + } + } + } + } configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") step("Verify on R2 that R1 advertises GR capabilities as a helper node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart-helper": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -4199,14 +4257,28 @@ def test_BGP_GR_TC_45_p1(request): start_router_daemons(tgen, "r2", ["bgpd"]) - input_dict = {"r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}} + input_dict = { + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + } + } configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") step("Verify on R2 that R1 advertises GR capabilities as a restarting node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -4307,7 +4379,9 @@ def test_BGP_GR_TC_46_p1(request): input_dict = { "r1": { "bgp": { - "graceful-restart": {"graceful-restart": True,}, + "graceful-restart": { + "graceful-restart": True, + }, "address_family": { "ipv4": { "unicast": { @@ -4559,7 +4633,9 @@ def test_BGP_GR_TC_47_p1(request): input_dict = { "r1": { "bgp": { - "graceful-restart": {"graceful-restart": True,}, + "graceful-restart": { + "graceful-restart": True, + }, "address_family": { "ipv4": { "unicast": { @@ -4698,7 +4774,13 @@ def test_BGP_GR_TC_47_p1(request): step("Verify on R2 that R1 still advertises GR capabilities as a restarting node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, } @@ -4814,7 +4896,9 @@ def test_BGP_GR_TC_48_p1(request): input_dict = { "r1": { "bgp": { - "graceful-restart": {"graceful-restart": True,}, + "graceful-restart": { + "graceful-restart": True, + }, "address_family": { "ipv4": { "unicast": { @@ -4960,7 +5044,13 @@ def test_BGP_GR_TC_48_p1(request): step("Verify on R2 that R1 advertises GR capabilities as a restarting node") input_dict = { - "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + }, "r2": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}}, } diff --git a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py index 6926121a6b..2ddeab13f6 100644 --- a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py +++ b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py @@ -456,7 +456,9 @@ def test_BGP_GR_TC_3_p0(request): input_dict = { "r1": { "bgp": { - "graceful-restart": {"disable-eor": True,}, + "graceful-restart": { + "disable-eor": True, + }, "address_family": { "ipv4": { "unicast": { @@ -2095,7 +2097,10 @@ def test_BGP_GR_chaos_33_p1(request): "ipv4": { "unicast": { "advertise_networks": [ - {"network": "200.0.20.1/32", "no_of_network": 2,} + { + "network": "200.0.20.1/32", + "no_of_network": 2, + } ] } }, @@ -2207,13 +2212,13 @@ def test_BGP_GR_chaos_33_p1(request): else: next_hop_6 = NEXT_HOP_6[1] - result = verify_rib(tgen, addr_type, dut, input_dict_2, next_hop_6, - expected=False) - assert result is not True,\ - "Testcase {} :Failed \n Error {}". \ - format(tc_name, result) - logger.info(" Expected behavior: {}".\ - format(result)) + result = verify_rib( + tgen, addr_type, dut, input_dict_2, next_hop_6, expected=False + ) + assert result is not True, "Testcase {} :Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) logger.info("[Step 4] : Start BGPd daemon on R1 and R4..") @@ -3960,7 +3965,13 @@ def test_BGP_GR_21_p2(request): } } }, - "r2": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, + "r2": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + } + } + }, } configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") diff --git a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py index 9c0355a3e9..c2858a4bd0 100644 --- a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py +++ b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py @@ -2132,7 +2132,11 @@ def test_large_community_lists_with_rmap_match_regex(request): { "action": "permit", "seq_id": "10", - "match": {"large_community_list": {"id": "ALL",},}, + "match": { + "large_community_list": { + "id": "ALL", + }, + }, } ] } @@ -2208,7 +2212,11 @@ def test_large_community_lists_with_rmap_match_regex(request): { "action": "permit", "seq_id": "20", - "match": {"large_community_list": {"id": "EXP_ALL",},}, + "match": { + "large_community_list": { + "id": "EXP_ALL", + }, + }, } ] } diff --git a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py index 9703cf8d57..d34446e2ee 100644 --- a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py +++ b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py @@ -95,7 +95,7 @@ from lib.common_config import ( kill_router_daemons, start_router_daemons, stop_router, - start_router + start_router, ) from lib.topolog import logger @@ -129,7 +129,7 @@ LOOPBACK_2 = { "ipv4": "20.20.20.20/32", "ipv6": "20::20:20/128", "ipv4_mask": "255.255.255.255", - "ipv6_mask": None + "ipv6_mask": None, } MAX_PATHS = 2 @@ -724,16 +724,40 @@ def test_vrf_with_multiple_links_p1(request): "local_as": "200", "vrf": "RED_A", "address_family": { - "ipv4": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, - "ipv6": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, + "ipv4": { + "unicast": { + "maximum_paths": { + "ebgp": MAX_PATHS, + } + } + }, + "ipv6": { + "unicast": { + "maximum_paths": { + "ebgp": MAX_PATHS, + } + } + }, }, }, { "local_as": "200", "vrf": "BLUE_A", "address_family": { - "ipv4": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, - "ipv6": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, + "ipv4": { + "unicast": { + "maximum_paths": { + "ebgp": MAX_PATHS, + } + } + }, + "ipv6": { + "unicast": { + "maximum_paths": { + "ebgp": MAX_PATHS, + } + } + }, }, }, ] @@ -2148,7 +2172,7 @@ def test_restart_bgpd_daemon_p1(request): assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) result = verify_bgp_convergence(tgen, topo) - assert result is True, "Testcase () :Failed\n Error {}". format(tc_name, result) + assert result is True, "Testcase () :Failed\n Error {}".format(tc_name, result) step("Kill BGPd daemon on R1.") kill_router_daemons(tgen, "r1", ["bgpd"]) diff --git a/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py b/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py index 6d7131e1e5..ceac84709b 100644 --- a/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py +++ b/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py @@ -101,7 +101,11 @@ def test_r1_receive_and_advertise_prefix_sid_type1(): "prefix": prefix, "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}}, "paths": [ - {"valid": True, "remoteLabel": remoteLabel, "labelIndex": labelIndex,} + { + "valid": True, + "remoteLabel": remoteLabel, + "labelIndex": labelIndex, + } ], } return topotest.json_cmp(output, expected) diff --git a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py index 3af944473d..3f9009967d 100644 --- a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py +++ b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py @@ -1042,9 +1042,10 @@ def test_next_hop_with_recursive_lookup_p1(request): next_hop=next_hop, expected=False, ) - assert result is not True, ( - "Testcase {} : Failed \n " - "Route is still present \n Error : {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format( + tc_name, result ) step("Re-apply redistribution on R4.") @@ -1125,9 +1126,10 @@ def test_next_hop_with_recursive_lookup_p1(request): next_hop=next_hop, expected=False, ) - assert result is not True, ( - "Testcase {} : Failed \n " - "Route is still present \n Error : {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format( + tc_name, result ) shutdown_bringup_interface(tgen, "r3", intf_r3_r4, True) @@ -1182,9 +1184,10 @@ def test_next_hop_with_recursive_lookup_p1(request): next_hop=next_hop, expected=False, ) - assert result is not True, ( - "Testcase {} : Failed \n " - "Route is still present \n Error : {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format( + tc_name, result ) shutdown_bringup_interface(tgen, "r4", intf_r4_r3, True) @@ -2101,8 +2104,20 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request): "r4": { "bgp": { "address_family": { - "ipv4": {"unicast": {"maximum_paths": {"ebgp": 1,}}}, - "ipv6": {"unicast": {"maximum_paths": {"ebgp": 1,}}}, + "ipv4": { + "unicast": { + "maximum_paths": { + "ebgp": 1, + } + } + }, + "ipv6": { + "unicast": { + "maximum_paths": { + "ebgp": 1, + } + } + }, } } } @@ -2131,8 +2146,20 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request): "r4": { "bgp": { "address_family": { - "ipv4": {"unicast": {"maximum_paths": {"ebgp": 2,}}}, - "ipv6": {"unicast": {"maximum_paths": {"ebgp": 2,}}}, + "ipv4": { + "unicast": { + "maximum_paths": { + "ebgp": 2, + } + } + }, + "ipv6": { + "unicast": { + "maximum_paths": { + "ebgp": 2, + } + } + }, } } } diff --git a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py index 0fabd90341..0467bf1bfb 100644 --- a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py +++ b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py @@ -415,9 +415,10 @@ def test_route_summarisation_with_summary_only_p1(request): result = verify_rib( tgen, addr_type, "r3", input_static, protocol="bgp", expected=False ) - assert result is not True, ( - "Testcase : Failed \n " - "Routes are still present \n Error: {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase : Failed \n " "Routes are still present \n Error: {}".format( + tc_name, result ) result = verify_rib(tgen, addr_type, "r1", input_static_agg, protocol="bgp") @@ -614,7 +615,9 @@ def test_route_summarisation_with_summary_only_p1(request): addr_type: { "unicast": { "advertise_networks": [ - {"network": NETWORK_4_1[addr_type],} + { + "network": NETWORK_4_1[addr_type], + } ] } } @@ -1014,7 +1017,11 @@ def test_route_summarisation_with_as_set_p1(request): assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result) for addr_type in ADDR_TYPES: - for pfx, seq_id, network, in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]): + for ( + pfx, + seq_id, + network, + ) in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]): prefix_list = { "r1": { "prefix_lists": { diff --git a/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py b/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py index cf8be5f44f..c75055c26f 100644 --- a/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py +++ b/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py @@ -56,6 +56,7 @@ class TemplateTopo(Topo): switch.add_link(tgen.gears["r2"]) switch.add_link(tgen.gears["r3"]) + def setup_module(mod): tgen = Topogen(TemplateTopo, mod.__name__) tgen.start_topology() @@ -114,6 +115,7 @@ def test_bgp_route(): assertmsg = '"r3" JSON output mismatches' assert result is None, assertmsg + if __name__ == "__main__": args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py index 9106c163cd..6e7495d929 100644 --- a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py +++ b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py @@ -943,9 +943,10 @@ def test_modify_route_map_match_set_clauses_p1(request): } result = verify_bgp_rib(tgen, addr_type, "r1", input_routes_r1, expected=False) - assert result is not True, ( - "Testcase {} : Failed \n Error : Routes are still " - "present {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase {} : Failed \n Error : Routes are still " "present {}".format( + tc_name, result ) write_test_footer(tc_name) diff --git a/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py b/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py index 67edcae90e..6f80ffd1aa 100755 --- a/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py +++ b/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py @@ -62,7 +62,7 @@ from functools import partial # Save the Current Working Directory to find configuration files. CWD = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, "../")) # pylint: disable=C0413 # Import topogen and topotest helpers @@ -76,8 +76,10 @@ from mininet.topo import Topo # Global multi-dimensional dictionary containing all expected outputs outputs = {} + class TemplateTopo(Topo): "Test topology builder" + def build(self, *_args, **_opts): "Build function" tgen = get_topogen(self) @@ -85,59 +87,58 @@ class TemplateTopo(Topo): # # Define FRR Routers # - for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: + for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]: tgen.add_router(router) # # Define connections # - switch = tgen.add_switch('s1') - switch.add_link(tgen.gears['rt1'], nodeif="eth-rt2") - switch.add_link(tgen.gears['rt2'], nodeif="eth-rt1") - switch = tgen.add_switch('s2') - switch.add_link(tgen.gears['rt2'], nodeif="eth-rt3") - switch.add_link(tgen.gears['rt3'], nodeif="eth-rt2") - switch = tgen.add_switch('s3') - switch.add_link(tgen.gears['rt1'], nodeif="eth-rt3") - switch.add_link(tgen.gears['rt3'], nodeif="eth-rt1") - switch = tgen.add_switch('s4') - switch.add_link(tgen.gears['rt1'], nodeif="eth-rt4") - switch.add_link(tgen.gears['rt4'], nodeif="eth-rt1") - switch = tgen.add_switch('s5') - switch.add_link(tgen.gears['rt1'], nodeif="eth-rt5") - switch.add_link(tgen.gears['rt5'], nodeif="eth-rt1") - switch = tgen.add_switch('s6') - switch.add_link(tgen.gears['rt1'], nodeif="eth-rt6") - switch.add_link(tgen.gears['rt6'], nodeif="eth-rt1") - switch = tgen.add_switch('s7') - switch.add_link(tgen.gears['rt2'], nodeif="eth-rt7") - switch.add_link(tgen.gears['rt7'], nodeif="eth-rt2") - switch = tgen.add_switch('s8') - switch.add_link(tgen.gears['rt3'], nodeif="eth-rt7") - switch.add_link(tgen.gears['rt7'], nodeif="eth-rt3") - switch = tgen.add_switch('s9') - switch.add_link(tgen.gears['rt4'], nodeif="eth-rt7") - switch.add_link(tgen.gears['rt7'], nodeif="eth-rt4") - switch = tgen.add_switch('s10') - switch.add_link(tgen.gears['rt5'], nodeif="eth-rt7") - switch.add_link(tgen.gears['rt7'], nodeif="eth-rt5") - switch = tgen.add_switch('s11') - switch.add_link(tgen.gears['rt6'], nodeif="eth-rt7") - switch.add_link(tgen.gears['rt7'], nodeif="eth-rt6") + switch = tgen.add_switch("s1") + switch.add_link(tgen.gears["rt1"], nodeif="eth-rt2") + switch.add_link(tgen.gears["rt2"], nodeif="eth-rt1") + switch = tgen.add_switch("s2") + switch.add_link(tgen.gears["rt2"], nodeif="eth-rt3") + switch.add_link(tgen.gears["rt3"], nodeif="eth-rt2") + switch = tgen.add_switch("s3") + switch.add_link(tgen.gears["rt1"], nodeif="eth-rt3") + switch.add_link(tgen.gears["rt3"], nodeif="eth-rt1") + switch = tgen.add_switch("s4") + switch.add_link(tgen.gears["rt1"], nodeif="eth-rt4") + switch.add_link(tgen.gears["rt4"], nodeif="eth-rt1") + switch = tgen.add_switch("s5") + switch.add_link(tgen.gears["rt1"], nodeif="eth-rt5") + switch.add_link(tgen.gears["rt5"], nodeif="eth-rt1") + switch = tgen.add_switch("s6") + switch.add_link(tgen.gears["rt1"], nodeif="eth-rt6") + switch.add_link(tgen.gears["rt6"], nodeif="eth-rt1") + switch = tgen.add_switch("s7") + switch.add_link(tgen.gears["rt2"], nodeif="eth-rt7") + switch.add_link(tgen.gears["rt7"], nodeif="eth-rt2") + switch = tgen.add_switch("s8") + switch.add_link(tgen.gears["rt3"], nodeif="eth-rt7") + switch.add_link(tgen.gears["rt7"], nodeif="eth-rt3") + switch = tgen.add_switch("s9") + switch.add_link(tgen.gears["rt4"], nodeif="eth-rt7") + switch.add_link(tgen.gears["rt7"], nodeif="eth-rt4") + switch = tgen.add_switch("s10") + switch.add_link(tgen.gears["rt5"], nodeif="eth-rt7") + switch.add_link(tgen.gears["rt7"], nodeif="eth-rt5") + switch = tgen.add_switch("s11") + switch.add_link(tgen.gears["rt6"], nodeif="eth-rt7") + switch.add_link(tgen.gears["rt7"], nodeif="eth-rt6") # # Populate multi-dimensional dictionary containing all expected outputs # - files = ["show_ipv6_route.ref", - "show_yang_interface_isis_adjacencies.ref"] - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: + files = ["show_ipv6_route.ref", "show_yang_interface_isis_adjacencies.ref"] + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]: outputs[rname] = {} for step in range(1, 13 + 1): outputs[rname][step] = {} for file in files: if step == 1: # Get snapshots relative to the expected initial network convergence - filename = '{}/{}/step{}/{}'.format(CWD, rname, step, file) + filename = "{}/{}/step{}/{}".format(CWD, rname, step, file) outputs[rname][step][file] = open(filename).read() else: if rname != "rt1": @@ -146,20 +147,23 @@ class TemplateTopo(Topo): continue # Get diff relative to the previous step - filename = '{}/{}/step{}/{}.diff'.format(CWD, rname, step, file) + filename = "{}/{}/step{}/{}.diff".format(CWD, rname, step, file) # Create temporary files in order to apply the diff f_in = tempfile.NamedTemporaryFile() f_in.write(outputs[rname][step - 1][file]) f_in.flush() f_out = tempfile.NamedTemporaryFile() - os.system("patch -s -o %s %s %s" %(f_out.name, f_in.name, filename)) + os.system( + "patch -s -o %s %s %s" % (f_out.name, f_in.name, filename) + ) # Store the updated snapshot and remove the temporary files outputs[rname][step][file] = open(f_out.name).read() f_in.close() f_out.close() + def setup_module(mod): "Sets up the pytest environment" tgen = Topogen(TemplateTopo, mod.__name__) @@ -170,16 +174,15 @@ def setup_module(mod): # For all registered routers, load the zebra configuration file for rname, router in router_list.iteritems(): router.load_config( - TopoRouter.RD_ZEBRA, - os.path.join(CWD, '{}/zebra.conf'.format(rname)) + TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) ) router.load_config( - TopoRouter.RD_ISIS, - os.path.join(CWD, '{}/isisd.conf'.format(rname)) + TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)) ) tgen.start_router() + def teardown_module(mod): "Teardown the pytest environment" tgen = get_topogen() @@ -187,6 +190,7 @@ def teardown_module(mod): # This function tears down the whole topology. tgen.stop_topology() + def router_compare_json_output(rname, command, reference): "Compare router JSON output" @@ -196,12 +200,12 @@ def router_compare_json_output(rname, command, reference): expected = json.loads(reference) # Run test function until we get an result. Wait at most 60 seconds. - test_func = partial(topotest.router_json_cmp, - tgen.gears[rname], command, expected) + test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected) _, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5) assertmsg = '"{}" JSON output mismatches the expected result'.format(rname) assert diff is None, assertmsg + # # Step 1 # @@ -215,9 +219,13 @@ def test_isis_adjacencies_step1(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: - router_compare_json_output(rname, "show yang operational-data /frr-interface:lib isisd", - outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]: + router_compare_json_output( + rname, + "show yang operational-data /frr-interface:lib isisd", + outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"], + ) + def test_rib_ipv6_step1(): logger.info("Test (step 1): verify IPv6 RIB") @@ -227,9 +235,11 @@ def test_rib_ipv6_step1(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][1]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][1]["show_ipv6_route.ref"] + ) + # # Step 2 @@ -248,16 +258,28 @@ def test_rib_ipv6_step2(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Disabling LFA protection on all rt1 interfaces') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt3" -c "no isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt4" -c "no isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt5" -c "no isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt6" -c "no isis fast-reroute lfa"') + logger.info("Disabling LFA protection on all rt1 interfaces") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt3" -c "no isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt4" -c "no isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt5" -c "no isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt6" -c "no isis fast-reroute lfa"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][2]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][2]["show_ipv6_route.ref"]) # # Step 3 @@ -276,16 +298,28 @@ def test_rib_ipv6_step3(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Re-enabling LFA protection on all rt1 interfaces') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt3" -c "isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt4" -c "isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt5" -c "isis fast-reroute lfa"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt6" -c "isis fast-reroute lfa"') + logger.info("Re-enabling LFA protection on all rt1 interfaces") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt3" -c "isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt4" -c "isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt5" -c "isis fast-reroute lfa"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt6" -c "isis fast-reroute lfa"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][3]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][3]["show_ipv6_route.ref"]) # # Step 4 @@ -304,12 +338,16 @@ def test_rib_ipv6_step4(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Disabling LFA load-sharing on rt1') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute load-sharing disable"') + logger.info("Disabling LFA load-sharing on rt1") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute load-sharing disable"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][4]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][4]["show_ipv6_route.ref"]) # # Step 5 @@ -328,12 +366,16 @@ def test_rib_ipv6_step5(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Re-enabling LFA load-sharing on rt1') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute load-sharing disable"') + logger.info("Re-enabling LFA load-sharing on rt1") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute load-sharing disable"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][5]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][5]["show_ipv6_route.ref"]) # # Step 6 @@ -352,12 +394,16 @@ def test_rib_ipv6_step6(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Limiting backup computation to critical priority prefixes only') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute priority-limit critical"') + logger.info("Limiting backup computation to critical priority prefixes only") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute priority-limit critical"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][6]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][6]["show_ipv6_route.ref"]) # # Step 7 @@ -377,13 +423,19 @@ def test_rib_ipv6_step7(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Configuring a prefix priority list') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "spf prefix-priority critical CRITICAL_DESTINATIONS"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"') + logger.info("Configuring a prefix priority list") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "spf prefix-priority critical CRITICAL_DESTINATIONS"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][7]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][7]["show_ipv6_route.ref"]) # # Step 8 @@ -402,14 +454,22 @@ def test_rib_ipv6_step8(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Reverting previous changes related to prefix priorities') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "no ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute priority-limit critical"') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no spf prefix-priority critical CRITICAL_DESTINATIONS"') + logger.info("Reverting previous changes related to prefix priorities") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "no ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute priority-limit critical"' + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "no spf prefix-priority critical CRITICAL_DESTINATIONS"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][8]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][8]["show_ipv6_route.ref"]) # # Step 9 @@ -428,12 +488,16 @@ def test_rib_ipv6_step9(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Excluding eth-rt6 from LFA computation for eth-rt2\'s failure') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa exclude interface eth-rt6"') + logger.info("Excluding eth-rt6 from LFA computation for eth-rt2's failure") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa exclude interface eth-rt6"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][9]["show_ipv6_route.ref"] + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][9]["show_ipv6_route.ref"]) # # Step 10 @@ -452,12 +516,20 @@ def test_rib_ipv6_step10(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Removing exclusion of eth-rt6 from LFA computation for eth-rt2\'s failure') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa exclude interface eth-rt6"') + logger.info( + "Removing exclusion of eth-rt6 from LFA computation for eth-rt2's failure" + ) + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa exclude interface eth-rt6"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, + "show ipv6 route isis json", + outputs[rname][10]["show_ipv6_route.ref"], + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][10]["show_ipv6_route.ref"]) # # Step 11 @@ -476,12 +548,18 @@ def test_rib_ipv6_step11(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Adding LFA tiebreaker: prefer node protecting backup path') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker node-protecting index 10"') + logger.info("Adding LFA tiebreaker: prefer node protecting backup path") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker node-protecting index 10"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, + "show ipv6 route isis json", + outputs[rname][11]["show_ipv6_route.ref"], + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][11]["show_ipv6_route.ref"]) # # Step 12 @@ -500,12 +578,18 @@ def test_rib_ipv6_step12(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Adding LFA tiebreaker: prefer backup path via downstream node') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker downstream index 20"') + logger.info("Adding LFA tiebreaker: prefer backup path via downstream node") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker downstream index 20"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, + "show ipv6 route isis json", + outputs[rname][12]["show_ipv6_route.ref"], + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][12]["show_ipv6_route.ref"]) # # Step 13 @@ -524,22 +608,29 @@ def test_rib_ipv6_step13(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Adding LFA tiebreaker: prefer backup path with lowest total metric') - tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker lowest-backup-metric index 30"') + logger.info("Adding LFA tiebreaker: prefer backup path with lowest total metric") + tgen.net["rt1"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker lowest-backup-metric index 30"' + ) + + for rname in ["rt1"]: + router_compare_json_output( + rname, + "show ipv6 route isis json", + outputs[rname][13]["show_ipv6_route.ref"], + ) - for rname in ['rt1']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][13]["show_ipv6_route.ref"]) # Memory leak test template def test_memory_leak(): "Run the memory leak test and report results." tgen = get_topogen() if not tgen.is_memleak_enabled(): - pytest.skip('Memory leak test/report is disabled') + pytest.skip("Memory leak test/report is disabled") tgen.report_memory_leaks() -if __name__ == '__main__': + +if __name__ == "__main__": args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args)) diff --git a/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py b/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py index 514ea53552..a1263de8ad 100755 --- a/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py +++ b/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py @@ -74,7 +74,7 @@ from functools import partial # Save the Current Working Directory to find configuration files. CWD = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, "../")) # pylint: disable=C0413 # Import topogen and topotest helpers @@ -88,8 +88,10 @@ from mininet.topo import Topo # Global multi-dimensional dictionary containing all expected outputs outputs = {} + class TemplateTopo(Topo): "Test topology builder" + def build(self, *_args, **_opts): "Build function" tgen = get_topogen(self) @@ -97,80 +99,85 @@ class TemplateTopo(Topo): # # Define FRR Routers # - for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: + for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: tgen.add_router(router) # # Define connections # - switch = tgen.add_switch('s1') - switch.add_link(tgen.gears['rt1'], nodeif="eth-sw1") - switch.add_link(tgen.gears['rt2'], nodeif="eth-sw1") - switch.add_link(tgen.gears['rt3'], nodeif="eth-sw1") + switch = tgen.add_switch("s1") + switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1") + switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1") + switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1") - switch = tgen.add_switch('s2') - switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-1") - switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-1") + switch = tgen.add_switch("s2") + switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-1") + switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-1") - switch = tgen.add_switch('s3') - switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-2") - switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-2") + switch = tgen.add_switch("s3") + switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-2") + switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-2") - switch = tgen.add_switch('s4') - switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-1") - switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-1") + switch = tgen.add_switch("s4") + switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-1") + switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-1") - switch = tgen.add_switch('s5') - switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-2") - switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-2") + switch = tgen.add_switch("s5") + switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-2") + switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-2") - switch = tgen.add_switch('s6') - switch.add_link(tgen.gears['rt4'], nodeif="eth-rt5") - switch.add_link(tgen.gears['rt5'], nodeif="eth-rt4") + switch = tgen.add_switch("s6") + switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5") + switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4") - switch = tgen.add_switch('s7') - switch.add_link(tgen.gears['rt4'], nodeif="eth-rt6") - switch.add_link(tgen.gears['rt6'], nodeif="eth-rt4") + switch = tgen.add_switch("s7") + switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6") + switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4") - switch = tgen.add_switch('s8') - switch.add_link(tgen.gears['rt5'], nodeif="eth-rt6") - switch.add_link(tgen.gears['rt6'], nodeif="eth-rt5") + switch = tgen.add_switch("s8") + switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6") + switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5") # # Populate multi-dimensional dictionary containing all expected outputs # - files = ["show_ip_route.ref", - "show_ipv6_route.ref", - "show_mpls_table.ref", - "show_yang_interface_isis_adjacencies.ref"] - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: + files = [ + "show_ip_route.ref", + "show_ipv6_route.ref", + "show_mpls_table.ref", + "show_yang_interface_isis_adjacencies.ref", + ] + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: outputs[rname] = {} for step in range(1, 9 + 1): outputs[rname][step] = {} for file in files: if step == 1: # Get snapshots relative to the expected initial network convergence - filename = '{}/{}/step{}/{}'.format(CWD, rname, step, file) + filename = "{}/{}/step{}/{}".format(CWD, rname, step, file) outputs[rname][step][file] = open(filename).read() else: if file == "show_yang_interface_isis_adjacencies.ref": continue # Get diff relative to the previous step - filename = '{}/{}/step{}/{}.diff'.format(CWD, rname, step, file) + filename = "{}/{}/step{}/{}.diff".format(CWD, rname, step, file) # Create temporary files in order to apply the diff f_in = tempfile.NamedTemporaryFile() f_in.write(outputs[rname][step - 1][file]) f_in.flush() f_out = tempfile.NamedTemporaryFile() - os.system("patch -s -o %s %s %s" %(f_out.name, f_in.name, filename)) + os.system( + "patch -s -o %s %s %s" % (f_out.name, f_in.name, filename) + ) # Store the updated snapshot and remove the temporary files outputs[rname][step][file] = open(f_out.name).read() f_in.close() f_out.close() + def setup_module(mod): "Sets up the pytest environment" tgen = Topogen(TemplateTopo, mod.__name__) @@ -181,16 +188,15 @@ def setup_module(mod): # For all registered routers, load the zebra configuration file for rname, router in router_list.items(): router.load_config( - TopoRouter.RD_ZEBRA, - os.path.join(CWD, '{}/zebra.conf'.format(rname)) + TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) ) router.load_config( - TopoRouter.RD_ISIS, - os.path.join(CWD, '{}/isisd.conf'.format(rname)) + TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)) ) tgen.start_router() + def teardown_module(mod): "Teardown the pytest environment" tgen = get_topogen() @@ -198,6 +204,7 @@ def teardown_module(mod): # This function tears down the whole topology. tgen.stop_topology() + def router_compare_json_output(rname, command, reference): "Compare router JSON output" @@ -207,12 +214,12 @@ def router_compare_json_output(rname, command, reference): expected = json.loads(reference) # Run test function until we get an result. Wait at most 60 seconds. - test_func = partial(topotest.router_json_cmp, - tgen.gears[rname], command, expected) + test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected) _, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5) assertmsg = '"{}" JSON output mismatches the expected result'.format(rname) assert diff is None, assertmsg + # # Step 1 # @@ -226,9 +233,13 @@ def test_isis_adjacencies_step1(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show yang operational-data /frr-interface:lib isisd", - outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, + "show yang operational-data /frr-interface:lib isisd", + outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"], + ) + def test_rib_ipv4_step1(): logger.info("Test (step 1): verify IPv4 RIB") @@ -238,9 +249,11 @@ def test_rib_ipv4_step1(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][1]["show_ip_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][1]["show_ip_route.ref"] + ) + def test_rib_ipv6_step1(): logger.info("Test (step 1): verify IPv6 RIB") @@ -250,9 +263,11 @@ def test_rib_ipv6_step1(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][1]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][1]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step1(): logger.info("Test (step 1): verify MPLS LIB") @@ -262,9 +277,11 @@ def test_mpls_lib_step1(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][1]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][1]["show_mpls_table.ref"] + ) + # # Step 2 @@ -283,12 +300,16 @@ def test_rib_ipv4_step2(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Disabling TI-LFA link protection on rt2\'s eth-sw1 interface') - tgen.net['rt2'].cmd('vtysh -c "conf t" -c "interface eth-sw1" -c "no isis fast-reroute ti-lfa"') + logger.info("Disabling TI-LFA link protection on rt2's eth-sw1 interface") + tgen.net["rt2"].cmd( + 'vtysh -c "conf t" -c "interface eth-sw1" -c "no isis fast-reroute ti-lfa"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][2]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][2]["show_ip_route.ref"]) def test_rib_ipv6_step2(): logger.info("Test (step 2): verify IPv6 RIB") @@ -298,9 +319,11 @@ def test_rib_ipv6_step2(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][2]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][2]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step2(): logger.info("Test (step 2): verify MPLS LIB") @@ -310,9 +333,11 @@ def test_mpls_lib_step2(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][2]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][2]["show_mpls_table.ref"] + ) + # # Step 3 @@ -331,12 +356,16 @@ def test_rib_ipv4_step3(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Enabling TI-LFA link protection on rt2\'s eth-sw1 interface') - tgen.net['rt2'].cmd('vtysh -c "conf t" -c "interface eth-sw1" -c "isis fast-reroute ti-lfa"') + logger.info("Enabling TI-LFA link protection on rt2's eth-sw1 interface") + tgen.net["rt2"].cmd( + 'vtysh -c "conf t" -c "interface eth-sw1" -c "isis fast-reroute ti-lfa"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][3]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][3]["show_ip_route.ref"]) def test_rib_ipv6_step3(): logger.info("Test (step 3): verify IPv6 RIB") @@ -346,9 +375,11 @@ def test_rib_ipv6_step3(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][3]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][3]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step3(): logger.info("Test (step 3): verify MPLS LIB") @@ -358,9 +389,11 @@ def test_mpls_lib_step3(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][3]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][3]["show_mpls_table.ref"] + ) + # # Step 4 @@ -384,12 +417,16 @@ def test_rib_ipv4_step4(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Disabling SR on rt4') - tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing on"') + logger.info("Disabling SR on rt4") + tgen.net["rt4"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing on"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][4]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][4]["show_ip_route.ref"]) def test_rib_ipv6_step4(): logger.info("Test (step 4): verify IPv6 RIB") @@ -399,9 +436,11 @@ def test_rib_ipv6_step4(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][4]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][4]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step4(): logger.info("Test (step 4): verify MPLS LIB") @@ -411,9 +450,11 @@ def test_mpls_lib_step4(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][4]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][4]["show_mpls_table.ref"] + ) + # # Step 5 @@ -432,12 +473,14 @@ def test_rib_ipv4_step5(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Enabling SR on rt4') - tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing on"') + logger.info("Enabling SR on rt4") + tgen.net["rt4"].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing on"') + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][5]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][5]["show_ip_route.ref"]) def test_rib_ipv6_step5(): logger.info("Test (step 5): verify IPv6 RIB") @@ -447,9 +490,11 @@ def test_rib_ipv6_step5(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][5]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][5]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step5(): logger.info("Test (step 5): verify MPLS LIB") @@ -459,9 +504,11 @@ def test_mpls_lib_step5(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][5]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][5]["show_mpls_table.ref"] + ) + # # Step 6 @@ -480,12 +527,16 @@ def test_rib_ipv4_step6(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Changing rt5\'s SRGB') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing global-block 30000 37999"') + logger.info("Changing rt5's SRGB") + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing global-block 30000 37999"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][6]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][6]["show_ip_route.ref"]) def test_rib_ipv6_step6(): logger.info("Test (step 6): verify IPv6 RIB") @@ -495,9 +546,11 @@ def test_rib_ipv6_step6(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][6]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][6]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step6(): logger.info("Test (step 6): verify MPLS LIB") @@ -507,9 +560,11 @@ def test_mpls_lib_step6(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][6]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][6]["show_mpls_table.ref"] + ) + # # Step 7 @@ -529,13 +584,19 @@ def test_rib_ipv4_step7(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Deleting rt5\'s Prefix-SIDs') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 5.5.5.5/32 index 50"') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 2001:db8:1000::5/128 index 51"') + logger.info("Deleting rt5's Prefix-SIDs") + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 5.5.5.5/32 index 50"' + ) + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 2001:db8:1000::5/128 index 51"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][7]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][7]["show_ip_route.ref"]) def test_rib_ipv6_step7(): logger.info("Test (step 7): verify IPv6 RIB") @@ -545,9 +606,11 @@ def test_rib_ipv6_step7(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][7]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][7]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step7(): logger.info("Test (step 7): verify MPLS LIB") @@ -557,9 +620,11 @@ def test_mpls_lib_step7(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][7]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][7]["show_mpls_table.ref"] + ) + # # Step 8 @@ -578,13 +643,19 @@ def test_rib_ipv4_step8(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Re-adding rt5\'s Prefix-SIDs') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 50"') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 51"') + logger.info("Re-adding rt5's Prefix-SIDs") + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 50"' + ) + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 51"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][8]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][8]["show_ip_route.ref"]) def test_rib_ipv6_step8(): logger.info("Test (step 8): verify IPv6 RIB") @@ -594,9 +665,11 @@ def test_rib_ipv6_step8(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][8]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][8]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step8(): logger.info("Test (step 8): verify MPLS LIB") @@ -606,9 +679,11 @@ def test_mpls_lib_step8(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][8]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][8]["show_mpls_table.ref"] + ) + # # Step 9 @@ -628,13 +703,19 @@ def test_rib_ipv4_step9(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - logger.info('Re-adding rt5\'s Prefix-SIDs') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 500"') - tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 501"') + logger.info("Re-adding rt5's Prefix-SIDs") + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 500"' + ) + tgen.net["rt5"].cmd( + 'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 501"' + ) + + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ip route isis json", outputs[rname][9]["show_ip_route.ref"] + ) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ip route isis json", - outputs[rname][9]["show_ip_route.ref"]) def test_rib_ipv6_step9(): logger.info("Test (step 9): verify IPv6 RIB") @@ -644,9 +725,11 @@ def test_rib_ipv6_step9(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show ipv6 route isis json", - outputs[rname][9]["show_ipv6_route.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show ipv6 route isis json", outputs[rname][9]["show_ipv6_route.ref"] + ) + def test_mpls_lib_step9(): logger.info("Test (step 9): verify MPLS LIB") @@ -656,19 +739,22 @@ def test_mpls_lib_step9(): if tgen.routers_have_failure(): pytest.skip(tgen.errors) - for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: - router_compare_json_output(rname, "show mpls table json", - outputs[rname][9]["show_mpls_table.ref"]) + for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: + router_compare_json_output( + rname, "show mpls table json", outputs[rname][9]["show_mpls_table.ref"] + ) + # Memory leak test template def test_memory_leak(): "Run the memory leak test and report results." tgen = get_topogen() if not tgen.is_memleak_enabled(): - pytest.skip('Memory leak test/report is disabled') + pytest.skip("Memory leak test/report is disabled") tgen.report_memory_leaks() -if __name__ == '__main__': + +if __name__ == "__main__": args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args)) diff --git a/tests/topotests/ldp-topo1/test_ldp_topo1.py b/tests/topotests/ldp-topo1/test_ldp_topo1.py index 31adeafbf6..9822686dfc 100644 --- a/tests/topotests/ldp-topo1/test_ldp_topo1.py +++ b/tests/topotests/ldp-topo1/test_ldp_topo1.py @@ -647,9 +647,11 @@ def test_zebra_ipv4_routingTable(): else: print("r%s ok" % i) - assert failures == 0, ( - "IPv4 Zebra Routing Table verification failed for router r%s:\n%s" - % (i, diff) + assert ( + failures == 0 + ), "IPv4 Zebra Routing Table verification failed for router r%s:\n%s" % ( + i, + diff, ) # Make sure that all daemons are running diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 6c24b6ddbb..0f591e58f0 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -1152,7 +1152,7 @@ def generate_ips(network, no_of_ips): mask = int(start_ipaddr.split("/")[1]) else: logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr)) - assert(0) + assert 0 addr_type = validate_ip_address(start_ip) if addr_type == "ipv4": @@ -2562,7 +2562,7 @@ def verify_rib( tag=None, metric=None, fib=None, - count_only=False + count_only=False, ): """ Data will be read from input_dict or input JSON file, API will generate @@ -2754,8 +2754,10 @@ def verify_rib( "Nexthops are missing for " "route {} in RIB of router {}: " "expected {}, found {}\n".format( - st_rt, dut, len(next_hop), - len(found_hops) + st_rt, + dut, + len(next_hop), + len(found_hops), ) ) return errormsg @@ -2802,7 +2804,11 @@ def verify_rib( errormsg = ( "[DUT: {}]: tag value {}" " is not matched for" - " route {} in RIB \n".format(dut, _tag, st_rt,) + " route {} in RIB \n".format( + dut, + _tag, + st_rt, + ) ) return errormsg @@ -2819,7 +2825,11 @@ def verify_rib( errormsg = ( "[DUT: {}]: metric value " "{} is not matched for " - "route {} in RIB \n".format(dut, metric, st_rt,) + "route {} in RIB \n".format( + dut, + metric, + st_rt, + ) ) return errormsg @@ -2868,7 +2878,9 @@ def verify_rib( for advertise_network_dict in advertise_network: if "vrf" in advertise_network_dict: - cmd = "{} vrf {} json".format(command, advertise_network_dict["vrf"]) + cmd = "{} vrf {} json".format( + command, advertise_network_dict["vrf"] + ) else: cmd = "{} json".format(command) @@ -2947,6 +2959,7 @@ def verify_rib( logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True + @retry(attempts=6, wait=2, return_is_str=True) def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): """ diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 9f3d4841b0..5c7514a641 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -874,7 +874,11 @@ def verify_ospf_rib( errormsg = ( "[DUT: {}]: tag value {}" " is not matched for" - " route {} in RIB \n".format(dut, _tag, st_rt,) + " route {} in RIB \n".format( + dut, + _tag, + st_rt, + ) ) return errormsg @@ -891,7 +895,11 @@ def verify_ospf_rib( errormsg = ( "[DUT: {}]: metric value " "{} is not matched for " - "route {} in RIB \n".format(dut, metric, st_rt,) + "route {} in RIB \n".format( + dut, + metric, + st_rt, + ) ) return errormsg diff --git a/tests/topotests/lib/test/test_json.py b/tests/topotests/lib/test/test_json.py index b85e193d3b..7b3c8593cc 100755 --- a/tests/topotests/lib/test/test_json.py +++ b/tests/topotests/lib/test/test_json.py @@ -107,16 +107,25 @@ def test_json_intersect_multilevel_true(): dcomplete = { "i1": "item1", "i2": "item2", - "i3": {"i100": "item100",}, + "i3": { + "i100": "item100", + }, "i4": { - "i41": {"i411": "item411",}, - "i42": {"i421": "item421", "i422": "item422",}, + "i41": { + "i411": "item411", + }, + "i42": { + "i421": "item421", + "i422": "item422", + }, }, } dsub1 = { "i1": "item1", - "i3": {"i100": "item100",}, + "i3": { + "i100": "item100", + }, "i10": None, } dsub2 = { @@ -126,10 +135,36 @@ def test_json_intersect_multilevel_true(): } dsub3 = { "i2": "item2", - "i4": {"i41": {"i411": "item411",}, "i42": {"i422": "item422", "i450": None,}}, + "i4": { + "i41": { + "i411": "item411", + }, + "i42": { + "i422": "item422", + "i450": None, + }, + }, + } + dsub4 = { + "i2": "item2", + "i4": { + "i41": {}, + "i42": { + "i450": None, + }, + }, + } + dsub5 = { + "i2": "item2", + "i3": { + "i100": "item100", + }, + "i4": { + "i42": { + "i450": None, + } + }, } - dsub4 = {"i2": "item2", "i4": {"i41": {}, "i42": {"i450": None,}}} - dsub5 = {"i2": "item2", "i3": {"i100": "item100",}, "i4": {"i42": {"i450": None,}}} assert json_cmp(dcomplete, dsub1) is None assert json_cmp(dcomplete, dsub2) is None @@ -144,17 +179,26 @@ def test_json_intersect_multilevel_false(): dcomplete = { "i1": "item1", "i2": "item2", - "i3": {"i100": "item100",}, + "i3": { + "i100": "item100", + }, "i4": { - "i41": {"i411": "item411",}, - "i42": {"i421": "item421", "i422": "item422",}, + "i41": { + "i411": "item411", + }, + "i42": { + "i421": "item421", + "i422": "item422", + }, }, } # Incorrect sub-level value dsub1 = { "i1": "item1", - "i3": {"i100": "item00",}, + "i3": { + "i100": "item00", + }, "i10": None, } # Inexistent sub-level @@ -166,14 +210,41 @@ def test_json_intersect_multilevel_false(): # Inexistent sub-level value dsub3 = { "i1": "item1", - "i3": {"i100": None,}, + "i3": { + "i100": None, + }, } # Inexistent sub-sub-level value - dsub4 = {"i4": {"i41": {"i412": "item412",}, "i42": {"i421": "item421",}}} + dsub4 = { + "i4": { + "i41": { + "i412": "item412", + }, + "i42": { + "i421": "item421", + }, + } + } # Invalid sub-sub-level value - dsub5 = {"i4": {"i41": {"i411": "item411",}, "i42": {"i421": "item420000",}}} + dsub5 = { + "i4": { + "i41": { + "i411": "item411", + }, + "i42": { + "i421": "item420000", + }, + } + } # sub-sub-level should be value - dsub6 = {"i4": {"i41": {"i411": "item411",}, "i42": "foobar",}} + dsub6 = { + "i4": { + "i41": { + "i411": "item411", + }, + "i42": "foobar", + } + } assert json_cmp(dcomplete, dsub1) is not None assert json_cmp(dcomplete, dsub2) is not None @@ -187,7 +258,15 @@ def test_json_with_list_sucess(): "Test successful json comparisons that have lists." dcomplete = { - "list": [{"i1": "item 1", "i2": "item 2",}, {"i10": "item 10",},], + "list": [ + { + "i1": "item 1", + "i2": "item 2", + }, + { + "i10": "item 10", + }, + ], "i100": "item 100", } @@ -197,12 +276,19 @@ def test_json_with_list_sucess(): } # Test list correct list items dsub2 = { - "list": [{"i1": "item 1",},], + "list": [ + { + "i1": "item 1", + }, + ], "i100": "item 100", } # Test list correct list size dsub3 = { - "list": [{}, {},], + "list": [ + {}, + {}, + ], } assert json_cmp(dcomplete, dsub1) is None @@ -214,7 +300,15 @@ def test_json_with_list_failure(): "Test failed json comparisons that have lists." dcomplete = { - "list": [{"i1": "item 1", "i2": "item 2",}, {"i10": "item 10",},], + "list": [ + { + "i1": "item 1", + "i2": "item 2", + }, + { + "i10": "item 10", + }, + ], "i100": "item 100", } @@ -224,12 +318,20 @@ def test_json_with_list_failure(): } # Test list incorrect list items dsub2 = { - "list": [{"i1": "item 2",},], + "list": [ + { + "i1": "item 2", + }, + ], "i100": "item 100", } # Test list correct list size dsub3 = { - "list": [{}, {}, {},], + "list": [ + {}, + {}, + {}, + ], } assert json_cmp(dcomplete, dsub1) is not None @@ -241,20 +343,52 @@ def test_json_list_start_success(): "Test JSON encoded data that starts with a list that should succeed." dcomplete = [ - {"id": 100, "value": "abc",}, - {"id": 200, "value": "abcd",}, - {"id": 300, "value": "abcde",}, + { + "id": 100, + "value": "abc", + }, + { + "id": 200, + "value": "abcd", + }, + { + "id": 300, + "value": "abcde", + }, ] - dsub1 = [{"id": 100, "value": "abc",}] + dsub1 = [ + { + "id": 100, + "value": "abc", + } + ] - dsub2 = [{"id": 100, "value": "abc",}, {"id": 200, "value": "abcd",}] + dsub2 = [ + { + "id": 100, + "value": "abc", + }, + { + "id": 200, + "value": "abcd", + }, + ] - dsub3 = [{"id": 300, "value": "abcde",}] + dsub3 = [ + { + "id": 300, + "value": "abcde", + } + ] dsub4 = [] - dsub5 = [{"id": 100,}] + dsub5 = [ + { + "id": 100, + } + ] assert json_cmp(dcomplete, dsub1) is None assert json_cmp(dcomplete, dsub2) is None @@ -272,13 +406,44 @@ def test_json_list_start_failure(): {"id": 300, "value": "abcde"}, ] - dsub1 = [{"id": 100, "value": "abcd",}] + dsub1 = [ + { + "id": 100, + "value": "abcd", + } + ] - dsub2 = [{"id": 100, "value": "abc",}, {"id": 200, "value": "abc",}] + dsub2 = [ + { + "id": 100, + "value": "abc", + }, + { + "id": 200, + "value": "abc", + }, + ] - dsub3 = [{"id": 100, "value": "abc",}, {"id": 350, "value": "abcde",}] + dsub3 = [ + { + "id": 100, + "value": "abc", + }, + { + "id": 350, + "value": "abcde", + }, + ] - dsub4 = [{"value": "abcx",}, {"id": 300, "value": "abcde",}] + dsub4 = [ + { + "value": "abcx", + }, + { + "id": 300, + "value": "abcde", + }, + ] assert json_cmp(dcomplete, dsub1) is not None assert json_cmp(dcomplete, dsub2) is not None diff --git a/tests/topotests/lib/topogen.py b/tests/topotests/lib/topogen.py index eaf7f90479..3f6a1baa0c 100644 --- a/tests/topotests/lib/topogen.py +++ b/tests/topotests/lib/topogen.py @@ -336,7 +336,9 @@ class Topogen(object): for gear in self.gears.values(): errors += gear.stop() if len(errors) > 0: - logger.error("Errors found post shutdown - details follow: {}".format(errors)) + logger.error( + "Errors found post shutdown - details follow: {}".format(errors) + ) self.net.stop() diff --git a/tests/topotests/lib/topotest.py b/tests/topotests/lib/topotest.py index 20d60ebbef..4113faa767 100644 --- a/tests/topotests/lib/topotest.py +++ b/tests/topotests/lib/topotest.py @@ -609,8 +609,10 @@ def interface_set_status(node, ifacename, ifaceaction=False, vrf_name=None): ifacename, str_ifaceaction ) else: - cmd = 'vtysh -c "configure terminal" -c "interface {0} vrf {1}" -c "{2}"'.format( - ifacename, vrf_name, str_ifaceaction + cmd = ( + 'vtysh -c "configure terminal" -c "interface {0} vrf {1}" -c "{2}"'.format( + ifacename, vrf_name, str_ifaceaction + ) ) node.run(cmd) @@ -924,40 +926,44 @@ def checkAddressSanitizerError(output, router, component, logdir=""): ) if addressSanitizerLog: # Find Calling Test. Could be multiple steps back - testframe=sys._current_frames().values()[0] - level=0 + testframe = sys._current_frames().values()[0] + level = 0 while level < 10: - test=os.path.splitext(os.path.basename(testframe.f_globals["__file__"]))[0] + test = os.path.splitext( + os.path.basename(testframe.f_globals["__file__"]) + )[0] if (test != "topotest") and (test != "topogen"): # Found the calling test - callingTest=os.path.basename(testframe.f_globals["__file__"]) + callingTest = os.path.basename(testframe.f_globals["__file__"]) break - level=level+1 - testframe=testframe.f_back - if (level >= 10): + level = level + 1 + testframe = testframe.f_back + if level >= 10: # somehow couldn't find the test script. - callingTest="unknownTest" + callingTest = "unknownTest" # # Now finding Calling Procedure - level=0 + level = 0 while level < 20: - callingProc=sys._getframe(level).f_code.co_name - if ((callingProc != "processAddressSanitizerError") and - (callingProc != "checkAddressSanitizerError") and - (callingProc != "checkRouterCores") and - (callingProc != "stopRouter") and - (callingProc != "__stop_internal") and - (callingProc != "stop") and - (callingProc != "stop_topology") and - (callingProc != "checkRouterRunning") and - (callingProc != "check_router_running") and - (callingProc != "routers_have_failure")): + callingProc = sys._getframe(level).f_code.co_name + if ( + (callingProc != "processAddressSanitizerError") + and (callingProc != "checkAddressSanitizerError") + and (callingProc != "checkRouterCores") + and (callingProc != "stopRouter") + and (callingProc != "__stop_internal") + and (callingProc != "stop") + and (callingProc != "stop_topology") + and (callingProc != "checkRouterRunning") + and (callingProc != "check_router_running") + and (callingProc != "routers_have_failure") + ): # Found the calling test break - level=level+1 - if (level >= 20): + level = level + 1 + if level >= 20: # something wrong - couldn't found the calling test function - callingProc="unknownProc" + callingProc = "unknownProc" with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile: sys.stderr.write( "AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n" @@ -979,7 +985,6 @@ def checkAddressSanitizerError(output, router, component, logdir=""): addrSanFile.write("\n---------------\n") return - addressSanitizerError = re.search( "(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ", output ) @@ -989,16 +994,20 @@ def checkAddressSanitizerError(output, router, component, logdir=""): # No Address Sanitizer Error in Output. Now check for AddressSanitizer daemon file if logdir: - filepattern=logdir+"/"+router+"/"+component+".asan.*" - logger.debug("Log check for %s on %s, pattern %s\n" % (component, router, filepattern)) + filepattern = logdir + "/" + router + "/" + component + ".asan.*" + logger.debug( + "Log check for %s on %s, pattern %s\n" % (component, router, filepattern) + ) for file in glob.glob(filepattern): with open(file, "r") as asanErrorFile: - asanError=asanErrorFile.read() + asanError = asanErrorFile.read() addressSanitizerError = re.search( "(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ", asanError - ) + ) if addressSanitizerError: - processAddressSanitizerError(addressSanitizerError, asanError, router, component) + processAddressSanitizerError( + addressSanitizerError, asanError, router, component + ) return True return False @@ -1065,7 +1074,7 @@ class Router(Node): if self.logdir is None: cur_test = os.environ["PYTEST_CURRENT_TEST"] self.logdir = "/tmp/topotests/" + cur_test[ - cur_test.find("/")+1 : cur_test.find(".py") + cur_test.find("/") + 1 : cur_test.find(".py") ].replace("/", ".") # If the logdir is not created, then create it and set the @@ -1073,7 +1082,7 @@ class Router(Node): if not os.path.isdir(self.logdir): os.system("mkdir -p " + self.logdir + "/" + name) os.system("chmod -R go+rw /tmp/topotests") - # Erase logs of previous run + # Erase logs of previous run os.system("rm -rf " + self.logdir + "/" + name) self.daemondir = None diff --git a/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py b/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py index 6d44d02e5e..2421b312d2 100644 --- a/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py +++ b/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py @@ -320,8 +320,10 @@ def test_ospf_link_down_kernel_route(): # Run test function until we get an result. Wait at most 60 seconds. test_func = partial(compare_show_ip_route_vrf, router.name, expected) result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5) - assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down: {}'.format( - router.name, diff + assertmsg = ( + 'OSPF IPv4 route mismatch in router "{}" after link down: {}'.format( + router.name, diff + ) ) assert result, assertmsg diff --git a/tests/topotests/ospf-topo1/test_ospf_topo1.py b/tests/topotests/ospf-topo1/test_ospf_topo1.py index 95193afb2a..24806dd8fc 100644 --- a/tests/topotests/ospf-topo1/test_ospf_topo1.py +++ b/tests/topotests/ospf-topo1/test_ospf_topo1.py @@ -310,7 +310,10 @@ def test_ospf_json(): # r4 has more interfaces for area 0.0.0.1 if router.name == "r4": expected["areas"]["0.0.0.1"].update( - {"areaIfActiveCounter": 2, "areaIfTotalCounter": 2,} + { + "areaIfActiveCounter": 2, + "areaIfTotalCounter": 2, + } ) # router 3 has an additional area @@ -372,16 +375,25 @@ def test_ospf_link_down_kernel_route(): } if router.name == "r1" or router.name == "r2": expected.update( - {"10.0.10.0/24": None, "172.16.0.0/24": None, "172.16.1.0/24": None,} + { + "10.0.10.0/24": None, + "172.16.0.0/24": None, + "172.16.1.0/24": None, + } ) elif router.name == "r3" or router.name == "r4": expected.update( - {"10.0.1.0/24": None, "10.0.2.0/24": None,} + { + "10.0.1.0/24": None, + "10.0.2.0/24": None, + } ) # Route '10.0.3.0' is no longer available for r4 since it is down. if router.name == "r4": expected.update( - {"10.0.3.0/24": None,} + { + "10.0.3.0/24": None, + } ) assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format( router.name @@ -443,12 +455,17 @@ def test_ospf6_link_down_kernel_route(): ) elif router.name == "r3" or router.name == "r4": expected.update( - {"2001:db8:1::/64": None, "2001:db8:2::/64": None,} + { + "2001:db8:1::/64": None, + "2001:db8:2::/64": None, + } ) # Route '2001:db8:3::/64' is no longer available for r4 since it is down. if router.name == "r4": expected.update( - {"2001:db8:3::/64": None,} + { + "2001:db8:3::/64": None, + } ) assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format( router.name diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py index 3b37b8a92f..5ef6b9b0be 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py @@ -252,7 +252,11 @@ def test_ospf_ecmp_tc16_p0(request): input_dict = { "r0": { "static_routes": [ - {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} + { + "network": NETWORK["ipv4"][0], + "no_of_ip": 5, + "next_hop": "Null0", + } ] } } @@ -415,7 +419,11 @@ def test_ospf_ecmp_tc17_p0(request): input_dict = { "r0": { "static_routes": [ - {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} + { + "network": NETWORK["ipv4"][0], + "no_of_ip": 5, + "next_hop": "Null0", + } ] } } diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py index 04b1f4b878..88667a6ac8 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py @@ -197,6 +197,7 @@ def teardown_module(mod): # Test cases start here. # ################################## + def test_ospf_routemaps_functionality_tc19_p0(request): """ OSPF Route map - Verify OSPF route map support functionality. @@ -215,121 +216,92 @@ def test_ospf_routemaps_functionality_tc19_p0(request): "r0": { "static_routes": [ { - "network": NETWORK['ipv4'][0], + "network": NETWORK["ipv4"][0], "no_of_ip": 5, - "next_hop": 'Null0', + "next_hop": "Null0", } ] } } result = create_static_routes(tgen, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) - ospf_red_r1 = { - "r0": { - "ospf": { - "redistribute": [{ - "redist_type": "static" - }] - } - } - } + ospf_red_r1 = {"r0": {"ospf": {"redistribute": [{"redist_type": "static"}]}}} result = create_router_ospf(tgen, topo, ospf_red_r1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) - dut = 'r1' - lsid = NETWORK['ipv4'][0].split("/")[0] - rid = routerids[0] - protocol = 'ospf' + dut = "r1" + lsid = NETWORK["ipv4"][0].split("/")[0] + rid = routerids[0] + protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) ospf_red_r1 = { "r0": { - "ospf": { - "redistribute": [{ - "redist_type": "static", - "del_action": True - }] - } + "ospf": {"redistribute": [{"redist_type": "static", "del_action": True}]} } } result = create_router_ospf(tgen, topo, ospf_red_r1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step( - 'Create prefix-list in R0 to permit 10.0.20.1/32 prefix &' - ' deny 10.0.20.2/32') + "Create prefix-list in R0 to permit 10.0.20.1/32 prefix &" " deny 10.0.20.2/32" + ) # Create ip prefix list pfx_list = { "r0": { "prefix_lists": { "ipv4": { - "pf_list_1_ipv4": [{ - "seqid": 10, - "network": NETWORK['ipv4'][0], - "action": "permit" - }, - { - "seqid": 11, - "network": "any", - "action": "deny" - } + "pf_list_1_ipv4": [ + { + "seqid": 10, + "network": NETWORK["ipv4"][0], + "action": "permit", + }, + {"seqid": 11, "network": "any", "action": "deny"}, ] } } } } result = create_prefix_lists(tgen, pfx_list) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) # Create route map routemaps = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ + "r0": { + "route_maps": { + "rmap_ipv4": [ + { "action": "permit", - "match": { - "ipv4": { - "prefix_lists": - "pf_list_1_ipv4" - } - } - }] - } + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + } + ] } + } } result = create_route_maps(tgen, routemaps) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step( "Configure route map rmap1 and redistribute static routes to" - " ospf using route map rmap1") + " ospf using route map rmap1" + ) ospf_red_r1 = { "r0": { "ospf": { - "redistribute": [{ - "redist_type": "static", - "route_map": "rmap_ipv4" - }] + "redistribute": [{"redist_type": "static", "route_map": "rmap_ipv4"}] } } } result = create_router_ospf(tgen, topo, ospf_red_r1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Change prefix rules to permit 10.0.20.2 and deny 10.0.20.1") # Create ip prefix list @@ -337,65 +309,54 @@ def test_ospf_routemaps_functionality_tc19_p0(request): "r0": { "prefix_lists": { "ipv4": { - "pf_list_1_ipv4": [{ - "seqid": 10, - "network": NETWORK['ipv4'][1], - "action": "permit" - }, - { - "seqid": 11, - "network": "any", - "action": "deny" - } + "pf_list_1_ipv4": [ + { + "seqid": 10, + "network": NETWORK["ipv4"][1], + "action": "permit", + }, + {"seqid": 11, "network": "any", "action": "deny"}, ] } } } } result = create_prefix_lists(tgen, pfx_list) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Verify that route 10.0.20.2 is allowed and 10.0.20.1 is denied.") - dut = 'r1' + dut = "r1" input_dict = { "r0": { "static_routes": [ - { - "network": NETWORK['ipv4'][1], - "no_of_ip": 1, - "next_hop": 'Null0' - } + {"network": NETWORK["ipv4"][1], "no_of_ip": 1, "next_hop": "Null0"} ] } } result = verify_ospf_rib(tgen, dut, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) input_dict = { "r0": { "static_routes": [ - { - "network": NETWORK['ipv4'][0], - "no_of_ip": 1, - "next_hop": 'Null0' - } + {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0"} ] } } result = verify_ospf_rib(tgen, dut, input_dict, expected=False) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) - result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, - expected=False) + result = verify_rib( + tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False + ) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) step("Delete and reconfigure prefix list.") # Create ip prefix list @@ -403,114 +364,101 @@ def test_ospf_routemaps_functionality_tc19_p0(request): "r0": { "prefix_lists": { "ipv4": { - "pf_list_1_ipv4": [{ - "seqid": 10, - "network": NETWORK['ipv4'][1], - "action": "permit", - "delete": True - }, - { - "seqid": 11, - "network": "any", - "action": "deny", - "delete": True - } + "pf_list_1_ipv4": [ + { + "seqid": 10, + "network": NETWORK["ipv4"][1], + "action": "permit", + "delete": True, + }, + { + "seqid": 11, + "network": "any", + "action": "deny", + "delete": True, + }, ] } } } } result = create_prefix_lists(tgen, pfx_list) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_prefix_lists(tgen, pfx_list) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) input_dict = { "r0": { "static_routes": [ - { - "network": NETWORK['ipv4'][0], - "no_of_ip": 5, - "next_hop": 'Null0' - } + {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0"} ] } } result = verify_ospf_rib(tgen, dut, input_dict, expected=False) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) - result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, - expected=False) + result = verify_rib( + tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False + ) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) pfx_list = { "r0": { "prefix_lists": { "ipv4": { - "pf_list_1_ipv4": [{ - "seqid": 10, - "network": NETWORK['ipv4'][1], - "action": "permit" - }, - { - "seqid": 11, - "network": "any", - "action": "deny" - } + "pf_list_1_ipv4": [ + { + "seqid": 10, + "network": NETWORK["ipv4"][1], + "action": "permit", + }, + {"seqid": 11, "network": "any", "action": "deny"}, ] } } } } result = create_prefix_lists(tgen, pfx_list) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Verify that route 10.0.20.2 is allowed and 10.0.20.1 is denied.") - dut = 'r1' + dut = "r1" input_dict = { "r0": { "static_routes": [ - { - "network": NETWORK['ipv4'][1], - "no_of_ip": 1, - "next_hop": 'Null0' - } + {"network": NETWORK["ipv4"][1], "no_of_ip": 1, "next_hop": "Null0"} ] } } result = verify_ospf_rib(tgen, dut, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) input_dict = { "r0": { "static_routes": [ - { - "network": NETWORK['ipv4'][0], - "no_of_ip": 1, - "next_hop": 'Null0' - } + {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0"} ] } } result = verify_ospf_rib(tgen, dut, input_dict, expected=False) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) - result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, - expected=False) + result = verify_rib( + tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False + ) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) write_test_footer(tc_name) @@ -535,7 +483,11 @@ def test_ospf_routemaps_functionality_tc20_p0(request): input_dict = { "r0": { "static_routes": [ - {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} + { + "network": NETWORK["ipv4"][0], + "no_of_ip": 5, + "next_hop": "Null0", + } ] } } @@ -663,318 +615,229 @@ def test_ospf_routemaps_functionality_tc21_p0(request): step( "Create static routes(10.0.20.1/32) in R1 and redistribute " - "to OSPF using route map.") + "to OSPF using route map." + ) # Create Static routes input_dict = { "r0": { "static_routes": [ { - "network": NETWORK['ipv4'][0], + "network": NETWORK["ipv4"][0], "no_of_ip": 5, - "next_hop": 'Null0', + "next_hop": "Null0", } ] } } result = create_static_routes(tgen, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) ospf_red_r0 = { "r0": { "ospf": { - "redistribute": [{ - "redist_type": "static", - "route_map": "rmap_ipv4" - }] + "redistribute": [{"redist_type": "static", "route_map": "rmap_ipv4"}] } } } result = create_router_ospf(tgen, topo, ospf_red_r0) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) # Create route map routemaps = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - "seq_id": 10 - }] - } - } + "r0": {"route_maps": {"rmap_ipv4": [{"action": "permit", "seq_id": 10}]}} } result = create_route_maps(tgen, routemaps) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Verify that route is advertised to R2.") - dut = 'r1' - protocol = 'ospf' + dut = "r1" + protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) # Create route map routemaps = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - "delete": True, - "seq_id": 10 - }] + "r0": { + "route_maps": { + "rmap_ipv4": [{"action": "permit", "delete": True, "seq_id": 10}] + } } } - } result = create_route_maps(tgen, routemaps) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) - step(' Configure route map with set clause (set metric)') + step(" Configure route map with set clause (set metric)") # Create route map routemaps = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - "set": { - "med": 123 - }, - "seq_id": 10 - }] + "r0": { + "route_maps": { + "rmap_ipv4": [{"action": "permit", "set": {"med": 123}, "seq_id": 10}] + } } } - } result = create_route_maps(tgen, routemaps) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Verify that configured metric is applied to ospf routes.") - dut = 'r1' - protocol = 'ospf' + dut = "r1" + protocol = "ospf" result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step( "Configure route map with match clause (match metric) with " - "some actions(change metric).") + "some actions(change metric)." + ) # Create route map routemaps = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - "match": { - "med": 123 - }, - "set": { - "med": 150 - }, - "seq_id": 10 - }] + "r0": { + "route_maps": { + "rmap_ipv4": [ + { + "action": "permit", + "match": {"med": 123}, + "set": {"med": 150}, + "seq_id": 10, + } + ] + } } } - } result = create_route_maps(tgen, routemaps) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Configure route map with call clause") # Create ip prefix list input_dict_2 = { - 'r0': { - 'prefix_lists': { - 'ipv4': { - 'pf_list_1_ipv4': [{ - 'seqid': 10, - 'network': 'any', - 'action': 'permit' - }] - } + "r0": { + "prefix_lists": { + "ipv4": { + "pf_list_1_ipv4": [ + {"seqid": 10, "network": "any", "action": "permit"} + ] + } } } } result = create_prefix_lists(tgen, input_dict_2) - assert result is True, 'Testcase {} : Failed \n Error: {}'.format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) # Create route map input_dict_3 = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } - }, - "set": { - "med": 150 - }, - "call": "rmap_match_pf_2_ipv4", - "seq_id": 10 - }], - "rmap_match_pf_2_ipv4": [{ - "action": "permit", - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } - }, - "set": { - "med": 200 - }, - "seq_id": 10 - }] + "r0": { + "route_maps": { + "rmap_ipv4": [ + { + "action": "permit", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 150}, + "call": "rmap_match_pf_2_ipv4", + "seq_id": 10, + } + ], + "rmap_match_pf_2_ipv4": [ + { + "action": "permit", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 200}, + "seq_id": 10, + } + ], + } } } - } result = create_route_maps(tgen, input_dict_3) - assert result is True, 'Testcase {} : Failed \n Error: {}'.format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_ospf_rib(tgen, dut, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) # Create route map - routemaps = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "delete": True - }] - } - } - } + routemaps = {"r0": {"route_maps": {"rmap_ipv4": [{"delete": True}]}}} result = create_route_maps(tgen, routemaps) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Configure route map with continue clause") # Create route map input_dict_3 = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - 'seq_id': '10', - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } - }, - "set": { - "med": 150 - }, - "continue": "30", - "seq_id": 10 - }, - { - "action": "permit", - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } - }, - "set": { - "med": 100 + "r0": { + "route_maps": { + "rmap_ipv4": [ + { + "action": "permit", + "seq_id": "10", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 150}, + "continue": "30", + "seq_id": 10, }, - "seq_id": 20 - }, - { - "action": "permit", - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } + { + "action": "permit", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 100}, + "seq_id": 20, }, - "set": { - "med": 50 + { + "action": "permit", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 50}, + "seq_id": 30, }, - "seq_id": 30 - } - ] + ] + } } } - } result = create_route_maps(tgen, input_dict_3) - assert result is True, 'Testcase {} : Failed \n Error: {}'.format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_ospf_rib(tgen, dut, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("Configure route map with goto clause") # Create route map input_dict_3 = { - "r0": { - "route_maps": { - "rmap_ipv4": [{ - "action": "permit", - 'seq_id': '10', - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } + "r0": { + "route_maps": { + "rmap_ipv4": [ + { + "action": "permit", + "seq_id": "10", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "goto": "30", }, - "goto": "30", - }, - { - "action": "permit", - 'seq_id': '20', - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } + { + "action": "permit", + "seq_id": "20", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 100}, }, - "set": { - "med": 100 - } - }, - { - "action": "permit", - 'seq_id': '30', - "match": { - "ipv4": { - "prefix_lists": "pf_list_1_ipv4" - } + { + "action": "permit", + "seq_id": "30", + "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, + "set": {"med": 200}, }, - "set": { - "med": 200 - } - } - ] + ] + } } } - } result = create_route_maps(tgen, input_dict_3) - assert result is True, 'Testcase {} : Failed \n Error: {}'.format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) @@ -1003,7 +866,11 @@ def test_ospf_routemaps_functionality_tc24_p0(request): input_dict = { "r0": { "static_routes": [ - {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0",} + { + "network": NETWORK["ipv4"][0], + "no_of_ip": 1, + "next_hop": "Null0", + } ] } } @@ -1037,9 +904,10 @@ def test_ospf_routemaps_functionality_tc24_p0(request): step("verify that prefix-list is created in R0.") result = verify_prefix_lists(tgen, pfx_list) - assert result is not True, ( - "Testcase {} : Failed \n Prefix list not " - "present. Error: {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase {} : Failed \n Prefix list not " "present. Error: {}".format( + tc_name, result ) # Create route map @@ -1105,9 +973,10 @@ def test_ospf_routemaps_functionality_tc24_p0(request): step("verify that prefix-list is created in R0.") result = verify_prefix_lists(tgen, pfx_list) - assert result is not True, ( - "Testcase {} : Failed \n Prefix list not " - "present. Error: {}".format(tc_name, result) + assert ( + result is not True + ), "Testcase {} : Failed \n Prefix list not " "present. Error: {}".format( + tc_name, result ) # Create route map diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py index 6ac0b515df..434d7f8ef5 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py @@ -407,7 +407,13 @@ def test_ospf_redistribution_tc6_p0(request): protocol = "ospf" result = verify_rib( - tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh, expected=False, + tgen, + "ipv4", + dut, + input_dict, + protocol=protocol, + next_hop=nh, + expected=False, ) assert result is not True, "Testcase {} : Failed \n Error: {}".format( tc_name, result @@ -549,7 +555,11 @@ def test_ospf_redistribution_tc8_p1(request): input_dict = { "r0": { "static_routes": [ - {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} + { + "network": NETWORK["ipv4"][0], + "no_of_ip": 5, + "next_hop": "Null0", + } ] } } diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py b/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py index 3a269d853c..a41aae1e82 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py @@ -776,7 +776,6 @@ def test_ospf_show_p1(request): write_test_footer(tc_name) - def test_ospf_dead_tc11_p0(request): """ OSPF timers. @@ -798,224 +797,146 @@ def test_ospf_dead_tc11_p0(request): step("modify dead interval from default value to some other value on r1") topo1 = { - 'r1': { - 'links': { - 'r0': { - 'interface': topo['routers']['r1']['links']['r0'][ - 'interface'], - 'ospf': { - 'hello_interval': 12, - 'dead_interval': 48 - } + "r1": { + "links": { + "r0": { + "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], + "ospf": {"hello_interval": 12, "dead_interval": 48}, } } } } - result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step( "verify that new timer value is configured and applied using " - "the show ip ospf interface command.") - dut = 'r1' - input_dict= { - 'r1': { - 'links':{ - 'r0': { - 'ospf':{ - 'timerDeadSecs': 48 - } - } - } - } - } + "the show ip ospf interface command." + ) + dut = "r1" + input_dict = {"r1": {"links": {"r0": {"ospf": {"timerDeadSecs": 48}}}}} result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) - step( - "modify dead interval from default value to r1" - "dead interval timer on r2") + step("modify dead interval from default value to r1" "dead interval timer on r2") topo1 = { - 'r0': { - 'links': { - 'r1': { - 'interface': topo['routers']['r0']['links']['r1'][ - 'interface'], - 'ospf': { - 'dead_interval': 48, - 'hello_interval': 12 - } + "r0": { + "links": { + "r1": { + "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], + "ospf": {"dead_interval": 48, "hello_interval": 12}, } } } } result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) - + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("verify that new timer value is configured.") - input_dict= { - 'r0': { - 'links':{ - 'r1': { - 'ospf':{ - 'timerDeadSecs': 48 - } - } - } - } - } - dut = 'r0' + input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 48}}}}} + dut = "r0" result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("verify that ospf neighbours are full") ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut) - assert ospf_covergence is True, ("setup_module :Failed \n Error:" - " {}".format(ospf_covergence)) + assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( + ospf_covergence + ) - step( - "reconfigure the default dead interval timer value to " - "default on r1 and r2") + step("reconfigure the default dead interval timer value to " "default on r1 and r2") topo1 = { - 'r0': { - 'links': { - 'r1': { - 'interface': topo['routers']['r0']['links']['r1'][ - 'interface'], - 'ospf': { - 'dead_interval': 40 - } + "r0": { + "links": { + "r1": { + "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], + "ospf": {"dead_interval": 40}, } } } } result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) topo1 = { - 'r1': { - 'links': { - 'r0': { - 'interface': topo['routers']['r1']['links']['r0'][ - 'interface'], - 'ospf': { - 'dead_interval': 40 - } + "r1": { + "links": { + "r0": { + "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], + "ospf": {"dead_interval": 40}, } } } } result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("verify that new timer value is configured.") - input_dict= { - 'r0': { - 'links':{ - 'r1': { - 'ospf':{ - 'timerDeadSecs': 40 - } - } - } - } - } - dut = 'r0' + input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 40}}}}} + dut = "r0" result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) - + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("verify that ospf neighbours are full") ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut) - assert ospf_covergence is True, ("setup_module :Failed \n Error:" - " {}".format(ospf_covergence)) - + assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( + ospf_covergence + ) step(" Configure dead timer = 65535 on r1 and r2") topo1 = { - 'r0': { - 'links': { - 'r1': { - 'interface': topo['routers']['r0']['links']['r1'][ - 'interface'], - 'ospf': { - 'dead_interval': 65535 - } + "r0": { + "links": { + "r1": { + "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], + "ospf": {"dead_interval": 65535}, } } } } result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) topo1 = { - 'r1': { - 'links': { - 'r0': { - 'interface': topo['routers']['r1']['links']['r0'][ - 'interface'], - 'ospf': { - 'dead_interval': 65535 - } + "r1": { + "links": { + "r0": { + "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], + "ospf": {"dead_interval": 65535}, } } } } result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("verify that new timer value is configured.") - input_dict= { - 'r0': { - 'links':{ - 'r1': { - 'ospf':{ - 'timerDeadSecs': 65535 - } - } - } - } - } - dut = 'r0' + input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 65535}}}}} + dut = "r0" result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step("verify that ospf neighbours are full") ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut) - assert ospf_covergence is True, ("setup_module :Failed \n Error:" - " {}".format(ospf_covergence)) - + assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( + ospf_covergence + ) step(" Try configuring timer values outside range for example 65536") topo1 = { - 'r0': { - 'links': { - 'r1': { - 'interface': topo['routers']['r0']['links']['r1'][ - 'interface'], - 'ospf': { - 'dead_interval': 65536 - } + "r0": { + "links": { + "r1": { + "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], + "ospf": {"dead_interval": 65536}, } } } @@ -1023,47 +944,33 @@ def test_ospf_dead_tc11_p0(request): result = create_interfaces_cfg(tgen, topo1) assert result is not True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) step("Unconfigure the dead timer from the interface from r1 and r2.") topo1 = { - 'r1': { - 'links': { - 'r0': { - 'interface': topo['routers']['r1']['links']['r0'][ - 'interface'], - 'ospf': { - 'dead_interval': 65535 - }, - 'delete': True + "r1": { + "links": { + "r0": { + "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], + "ospf": {"dead_interval": 65535}, + "delete": True, } } } } result = create_interfaces_cfg(tgen, topo1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) step( - "Verify that timer value is deleted from intf & " - "set to default value 40 sec.") - input_dict= { - 'r1': { - 'links':{ - 'r0': { - 'ospf':{ - 'timerDeadSecs': 40 - } - } - } - } - } - dut = 'r1' + "Verify that timer value is deleted from intf & " "set to default value 40 sec." + ) + input_dict = {"r1": {"links": {"r0": {"ospf": {"timerDeadSecs": 40}}}}} + dut = "r1" result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) diff --git a/tests/topotests/rip-topo1/test_rip_topo1.py b/tests/topotests/rip-topo1/test_rip_topo1.py index de11b78824..fdafa50aba 100644 --- a/tests/topotests/rip-topo1/test_rip_topo1.py +++ b/tests/topotests/rip-topo1/test_rip_topo1.py @@ -352,9 +352,11 @@ def test_zebra_ipv4_routingTable(): else: print("r%s ok" % i) - assert failures == 0, ( - "Zebra IPv4 Routing Table verification failed for router r%s:\n%s" - % (i, diff) + assert ( + failures == 0 + ), "Zebra IPv4 Routing Table verification failed for router r%s:\n%s" % ( + i, + diff, ) # Make sure that all daemons are still running diff --git a/tests/topotests/ripng-topo1/test_ripng_topo1.py b/tests/topotests/ripng-topo1/test_ripng_topo1.py index 2976cdefe4..4702d33dae 100644 --- a/tests/topotests/ripng-topo1/test_ripng_topo1.py +++ b/tests/topotests/ripng-topo1/test_ripng_topo1.py @@ -373,9 +373,11 @@ def test_zebra_ipv6_routingTable(): else: print("r%s ok" % i) - assert failures == 0, ( - "Zebra IPv6 Routing Table verification failed for router r%s:\n%s" - % (i, diff) + assert ( + failures == 0 + ), "Zebra IPv6 Routing Table verification failed for router r%s:\n%s" % ( + i, + diff, ) # Make sure that all daemons are running diff --git a/tests/topotests/zebra_netlink/test_zebra_netlink.py b/tests/topotests/zebra_netlink/test_zebra_netlink.py index 4da5303641..94baf8438f 100644 --- a/tests/topotests/zebra_netlink/test_zebra_netlink.py +++ b/tests/topotests/zebra_netlink/test_zebra_netlink.py @@ -118,7 +118,12 @@ def test_zebra_netlink_batching(): r1.vtysh_cmd("sharp install routes 2.1.3.7 nexthop 192.168.1.1 100") json_file = "{}/r1/v4_route.json".format(CWD) expected = json.loads(open(json_file).read()) - test_func = partial(topotest.router_json_cmp, r1, "show ip route json", expected,) + test_func = partial( + topotest.router_json_cmp, + r1, + "show ip route json", + expected, + ) _, result = topotest.run_and_expect(test_func, None, count=2, wait=0.5) assertmsg = '"r1" JSON output mismatches' assert result is None, assertmsg -- 2.39.5