diff options
Diffstat (limited to 'tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py')
| -rwxr-xr-x | tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py | 561 |
1 files changed, 247 insertions, 314 deletions
diff --git a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py index 4b9f419bf2..bad421768c 100755 --- a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py +++ b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py @@ -41,10 +41,11 @@ import sys import time import json import pytest + # Save the Current Working Directory to find configuration files. CWD = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.join(CWD, '../')) -sys.path.append(os.path.join(CWD, '../../')) +sys.path.append(os.path.join(CWD, "../")) +sys.path.append(os.path.join(CWD, "../../")) # pylint: disable=C0413 # Import topogen and topotest helpers @@ -52,15 +53,17 @@ from lib.topogen import Topogen, get_topogen from mininet.topo import Topo from lib.common_config import ( - start_topology, write_test_header, + start_topology, + write_test_header, write_test_footer, - verify_rib, create_static_routes, check_address_types, - interface_status, reset_config_on_routers + verify_rib, + create_static_routes, + check_address_types, + interface_status, + reset_config_on_routers, ) from lib.topolog import logger -from lib.bgp import ( - verify_bgp_convergence, create_router_bgp, - clear_bgp_and_verify) +from lib.bgp import verify_bgp_convergence, create_router_bgp, clear_bgp_and_verify from lib.topojson import build_topo_from_json, build_config_from_json # Reading the data from JSON File for topology and configuration creation @@ -130,27 +133,32 @@ def setup_module(mod): ADDR_TYPES = check_address_types() BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) - assert BGP_CONVERGENCE is True, ("setup_module :Failed \n Error:" - " {}".format(BGP_CONVERGENCE)) - - link_data = [val for links, val in - topo["routers"]["r2"]["links"].iteritems() - if "r3" in links] + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format( + BGP_CONVERGENCE + ) + + link_data = [ + val + for links, val in topo["routers"]["r2"]["links"].iteritems() + if "r3" in links + ] for adt in ADDR_TYPES: NEXT_HOPS[adt] = [val[adt].split("/")[0] for val in link_data] if adt == "ipv4": - NEXT_HOPS[adt] = sorted( - NEXT_HOPS[adt], key=lambda x: int(x.split(".")[2])) + NEXT_HOPS[adt] = sorted(NEXT_HOPS[adt], key=lambda x: int(x.split(".")[2])) elif adt == "ipv6": NEXT_HOPS[adt] = sorted( - NEXT_HOPS[adt], key=lambda x: int(x.split(':')[-3], 16)) + NEXT_HOPS[adt], key=lambda x: int(x.split(":")[-3], 16) + ) INTF_LIST_R2 = [val["interface"].split("/")[0] for val in link_data] INTF_LIST_R2 = sorted(INTF_LIST_R2, key=lambda x: int(x.split("eth")[1])) - link_data = [val for links, val in - topo["routers"]["r3"]["links"].iteritems() - if "r2" in links] + link_data = [ + val + for links, val in topo["routers"]["r3"]["links"].iteritems() + if "r2" in links + ] INTF_LIST_R3 = [val["interface"].split("/")[0] for val in link_data] INTF_LIST_R3 = sorted(INTF_LIST_R3, key=lambda x: int(x.split("eth")[1])) @@ -179,40 +187,27 @@ def static_or_nw(tgen, topo, tc_name, test_type, dut): input_dict_static = { dut: { "static_routes": [ - { - "network": NETWORK["ipv4"], - "next_hop": NEXT_HOP_IP["ipv4"] - }, - { - "network": NETWORK["ipv6"], - "next_hop": NEXT_HOP_IP["ipv6"] - } + {"network": NETWORK["ipv4"], "next_hop": NEXT_HOP_IP["ipv4"]}, + {"network": NETWORK["ipv6"], "next_hop": NEXT_HOP_IP["ipv6"]}, ] } } logger.info("Configuring static route on router %s", dut) result = create_static_routes(tgen, input_dict_static) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) input_dict_2 = { dut: { "bgp": { "address_family": { "ipv4": { - "unicast": { - "redistribute": [{ - "redist_type": "static" - }] - } + "unicast": {"redistribute": [{"redist_type": "static"}]} }, "ipv6": { - "unicast": { - "redistribute": [{ - "redist_type": "static" - }] - } - } + "unicast": {"redistribute": [{"redist_type": "static"}]} + }, } } } @@ -221,7 +216,8 @@ def static_or_nw(tgen, topo, tc_name, test_type, dut): logger.info("Configuring redistribute static route on router %s", dut) result = create_router_bgp(tgen, topo, input_dict_2) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) elif test_type == "advertise_nw": input_dict_nw = { @@ -230,28 +226,29 @@ def static_or_nw(tgen, topo, tc_name, test_type, dut): "address_family": { "ipv4": { "unicast": { - "advertise_networks": [ - {"network": NETWORK["ipv4"]} - ] + "advertise_networks": [{"network": NETWORK["ipv4"]}] } }, "ipv6": { "unicast": { - "advertise_networks": [ - {"network": NETWORK["ipv6"]} - ] + "advertise_networks": [{"network": NETWORK["ipv6"]}] } - } + }, } } } } - logger.info("Advertising networks %s %s from router %s", - NETWORK["ipv4"], NETWORK["ipv6"], dut) + logger.info( + "Advertising networks %s %s from router %s", + NETWORK["ipv4"], + NETWORK["ipv6"], + dut, + ) result = create_router_bgp(tgen, topo, input_dict_nw) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) @pytest.mark.parametrize("ecmp_num", ["8", "16", "32"]) @@ -274,20 +271,8 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type): "r3": { "bgp": { "address_family": { - "ipv4": { - "unicast": { - "maximum_paths": { - "ebgp": ecmp_num, - } - } - }, - "ipv6": { - "unicast": { - "maximum_paths": { - "ebgp": ecmp_num, - } - } - } + "ipv4": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}}, + "ipv6": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}}, } } } @@ -295,30 +280,27 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type): logger.info("Configuring bgp maximum-paths %s on router r3", ecmp_num) result = create_router_bgp(tgen, topo, input_dict) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) # Verifying RIB routes dut = "r3" protocol = "bgp" for addr_type in ADDR_TYPES: - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_1, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_1, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) write_test_footer(tc_name) @@ -338,44 +320,39 @@ def test_ecmp_after_clear_bgp(request): static_or_nw(tgen, topo, tc_name, "redist_static", "r2") for addr_type in ADDR_TYPES: - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_1, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_1, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) # Clear bgp result = clear_bgp_and_verify(tgen, topo, dut) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) for addr_type in ADDR_TYPES: - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_1, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_1, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) write_test_footer(tc_name) @@ -395,22 +372,20 @@ def test_ecmp_remove_redistribute_static(request): # Verifying RIB routes dut = "r3" protocol = "bgp" - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_1, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_1, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) input_dict_2 = { "r2": { @@ -418,22 +393,14 @@ def test_ecmp_remove_redistribute_static(request): "address_family": { "ipv4": { "unicast": { - "redistribute": [{ - "redist_type": "static", - "delete": True - - }] + "redistribute": [{"redist_type": "static", "delete": True}] } }, "ipv6": { "unicast": { - "redistribute": [{ - "redist_type": "static", - "delete": True - - }] + "redistribute": [{"redist_type": "static", "delete": True}] } - } + }, } } } @@ -441,76 +408,60 @@ def test_ecmp_remove_redistribute_static(request): logger.info("Remove redistribute static") result = create_router_bgp(tgen, topo, input_dict_2) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) for addr_type in ADDR_TYPES: # Verifying RIB routes dut = "r3" protocol = "bgp" - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3 are deleted", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_1, - next_hop=[], protocol=protocol, expected=False) - assert result is not True, "Testcase {} : Failed \n Routes still" \ - " present in RIB".format(tc_name) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_1, + next_hop=[], + protocol=protocol, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Routes still" " present in RIB".format(tc_name) logger.info("Enable redistribute static") input_dict_2 = { "r2": { "bgp": { "address_family": { - "ipv4": { - "unicast": { - "redistribute": [{ - "redist_type": "static" - }] - } - }, - "ipv6": { - "unicast": { - "redistribute": [{ - "redist_type": "static" - }] - } - } + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, } } } } result = create_router_bgp(tgen, topo, input_dict_2) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) for addr_type in ADDR_TYPES: # Verifying RIB routes dut = "r3" protocol = "bgp" - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_1, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_1, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) write_test_footer(tc_name) @@ -537,37 +488,30 @@ def test_ecmp_shut_bgp_neighbor(request): static_or_nw(tgen, topo, tc_name, "redist_static", "r2") for addr_type in ADDR_TYPES: - input_dict = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) - for intf_num in range(len(INTF_LIST_R2)+1, 16): - intf_val = INTF_LIST_R2[intf_num:intf_num+16] + for intf_num in range(len(INTF_LIST_R2) + 1, 16): + intf_val = INTF_LIST_R2[intf_num : intf_num + 16] - input_dict_1 = { - "r2": { - "interface_list": [intf_val], - "status": "down" - } - } - logger.info("Shutting down neighbor interface {} on r2". - format(intf_val)) + input_dict_1 = {"r2": {"interface_list": [intf_val], "status": "down"}} + logger.info("Shutting down neighbor interface {} on r2".format(intf_val)) result = interface_status(tgen, topo, input_dict_1) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) for addr_type in ADDR_TYPES: if intf_num + 16 < 32: @@ -575,52 +519,37 @@ def test_ecmp_shut_bgp_neighbor(request): else: check_hops = [] - input_dict = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict, - next_hop=check_hops, - protocol=protocol) + result = verify_rib( + tgen, addr_type, dut, input_dict, next_hop=check_hops, protocol=protocol + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) - input_dict_1 = { - "r2": { - "interface_list": INTF_LIST_R2, - "status": "up" - } - } + input_dict_1 = {"r2": {"interface_list": INTF_LIST_R2, "status": "up"}} logger.info("Enabling all neighbor interface {} on r2") result = interface_status(tgen, topo, input_dict_1) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) static_or_nw(tgen, topo, tc_name, "redist_static", "r2") for addr_type in ADDR_TYPES: - input_dict = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) write_test_footer(tc_name) @@ -643,22 +572,20 @@ def test_ecmp_remove_static_route(request): static_or_nw(tgen, topo, tc_name, "redist_static", "r2") for addr_type in ADDR_TYPES: - input_dict_1 = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) result = verify_rib( - tgen, addr_type, dut, input_dict_1, - next_hop=NEXT_HOPS[addr_type], protocol=protocol) + tgen, + addr_type, + dut, + input_dict_1, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) for addr_type in ADDR_TYPES: input_dict_2 = { @@ -667,7 +594,7 @@ def test_ecmp_remove_static_route(request): { "network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type], - "delete": True + "delete": True, } ] } @@ -676,23 +603,29 @@ def test_ecmp_remove_static_route(request): logger.info("Remove static routes") result = create_static_routes(tgen, input_dict_2) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) logger.info("Verifying %s routes on r3 are removed", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_2, - next_hop=[], protocol=protocol, expected=False) - assert result is not True, "Testcase {} : Failed \n Routes still" \ - " present in RIB".format(tc_name) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_2, + next_hop=[], + protocol=protocol, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Routes still" " present in RIB".format(tc_name) for addr_type in ADDR_TYPES: # Enable static routes input_dict_4 = { "r2": { "static_routes": [ - { - "network": NETWORK[addr_type], - "next_hop": NEXT_HOP_IP[addr_type] - } + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} ] } } @@ -700,14 +633,21 @@ def test_ecmp_remove_static_route(request): logger.info("Enable static route") result = create_static_routes(tgen, input_dict_4) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict_4, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_4, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) def test_ecmp_remove_nw_advertise(request): @@ -727,22 +667,20 @@ def test_ecmp_remove_nw_advertise(request): reset_config_on_routers(tgen) static_or_nw(tgen, topo, tc_name, "advertise_nw", "r2") for addr_type in ADDR_TYPES: - input_dict = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) input_dict_3 = { "r2": { @@ -750,64 +688,59 @@ def test_ecmp_remove_nw_advertise(request): "address_family": { "ipv4": { "unicast": { - "advertise_networks": [{ - "network": NETWORK["ipv4"], - "delete": True - }] - } - }, + "advertise_networks": [ + {"network": NETWORK["ipv4"], "delete": True} + ] + } + }, "ipv6": { "unicast": { - "advertise_networks": [{ - "network": NETWORK["ipv6"], - "delete": True - }] - } + "advertise_networks": [ + {"network": NETWORK["ipv6"], "delete": True} + ] } - } + }, } } } + } logger.info("Withdraw advertised networks") result = create_router_bgp(tgen, topo, input_dict_3) - assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) for addr_type in ADDR_TYPES: - input_dict = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict, - next_hop=[], protocol=protocol, expected=False) - assert result is not True, "Testcase {} : Failed \n Routes still" \ - " present in RIB".format(tc_name) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=[], + protocol=protocol, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Routes still" " present in RIB".format(tc_name) static_or_nw(tgen, topo, tc_name, "advertise_nw", "r2") for addr_type in ADDR_TYPES: - input_dict = { - "r3": { - "static_routes": [ - { - "network": NETWORK[addr_type] - } - ] - } - } + input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}} logger.info("Verifying %s routes on r3", addr_type) - result = verify_rib(tgen, addr_type, dut, input_dict, - next_hop=NEXT_HOPS[addr_type], - protocol=protocol) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=NEXT_HOPS[addr_type], + protocol=protocol, + ) assert result is True, "Testcase {} : Failed \n Error: {}".format( - tc_name, result) + tc_name, result + ) write_test_footer(tc_name) |
