diff options
Diffstat (limited to 'tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py')
| -rw-r--r--[-rwxr-xr-x] | tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py | 930 |
1 files changed, 744 insertions, 186 deletions
diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py index e99111d90b..b3b7256ac4 100755..100644 --- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py +++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py @@ -37,6 +37,8 @@ Test steps - Verify clear bgp - Test bgp convergence with loopback interface - Test advertise network using network command +- Verify routes not installed in zebra when /32 routes received + with loopback BGP session subnet """ import os @@ -48,8 +50,8 @@ from copy import deepcopy # Save the Current Working Directory to find configuration files. CWD = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.join(CWD, '../')) -sys.path.append(os.path.join(CWD, '../lib/')) +sys.path.append(os.path.join(CWD, "../")) +sys.path.append(os.path.join(CWD, "../lib/")) # Required to instantiate the topology builder class. @@ -59,26 +61,60 @@ from lib.topogen import Topogen, get_topogen from mininet.topo import Topo from lib.common_config import ( - start_topology, write_test_header, - write_test_footer, reset_config_on_routers, create_static_routes, - verify_rib, verify_admin_distance_for_static_routes + step, + start_topology, + write_test_header, + write_test_footer, + reset_config_on_routers, + create_static_routes, + verify_rib, + verify_admin_distance_for_static_routes, + check_address_types, + apply_raw_config, + addKernelRoute, + verify_fib_routes, + create_prefix_lists, + create_route_maps, + verify_bgp_community, + required_linux_kernel_version, ) from lib.topolog import logger from lib.bgp import ( - verify_bgp_convergence, create_router_bgp, verify_router_id, - modify_as_number, verify_as_numbers, clear_bgp_and_verify, - verify_bgp_timers_and_functionality + verify_bgp_convergence, + create_router_bgp, + verify_router_id, + modify_as_number, + verify_as_numbers, + clear_bgp_and_verify, + verify_bgp_timers_and_functionality, + verify_bgp_rib, ) from lib.topojson import build_topo_from_json, build_config_from_json # Reading the data from JSON File for topology creation jsonFile = "{}/bgp_basic_functionality.json".format(CWD) try: - with open(jsonFile, 'r') as topoJson: + with open(jsonFile, "r") as topoJson: topo = json.load(topoJson) except IOError: assert False, "Could not read file {}".format(jsonFile) +# Global Variable +KEEPALIVETIMER = 2 +HOLDDOWNTIMER = 6 +r1_ipv4_loopback = "1.0.1.0/24" +r2_ipv4_loopback = "1.0.2.0/24" +r3_ipv4_loopback = "1.0.3.0/24" +r4_ipv4_loopback = "1.0.4.0/24" +r1_ipv6_loopback = "2001:db8:f::1:0/120" +r2_ipv6_loopback = "2001:db8:f::2:0/120" +r3_ipv6_loopback = "2001:db8:f::3:0/120" +r4_ipv6_loopback = "2001:db8:f::4:0/120" +NETWORK = { + "ipv4": ["100.1.1.1/32", "100.1.1.2/32"], + "ipv6": ["100::1/128", "100::2/128"], +} + class CreateTopo(Topo): """ @@ -102,6 +138,11 @@ def setup_module(mod): * `mod`: module name """ + # Required linux kernel version for this suite to run. + result = required_linux_kernel_version("4.15") + if result is not True: + pytest.skip("Kernel requirements are not met") + testsuite_run_time = time.asctime(time.localtime(time.time())) logger.info("Testsuite start time: {}".format(testsuite_run_time)) logger.info("=" * 40) @@ -119,10 +160,13 @@ def setup_module(mod): # Creating configuration from JSON build_config_from_json(tgen, topo) + global ADDR_TYPES global BGP_CONVERGENCE + ADDR_TYPES = check_address_types() BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) - assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}". \ - format(BGP_CONVERGENCE) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) logger.info("Running setup_module() done") @@ -137,8 +181,9 @@ def teardown_module(): # Stop toplogy and Remove tmp files tgen.stop_topology() - logger.info("Testsuite end time: {}". - format(time.asctime(time.localtime(time.time())))) + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) logger.info("=" * 40) @@ -154,7 +199,7 @@ def test_modify_and_delete_router_id(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -162,59 +207,31 @@ def test_modify_and_delete_router_id(request): # Modify router id input_dict = { - 'r1': { - "bgp": { - 'router_id': '12.12.12.12' - } - }, - 'r2': { - "bgp": { - 'router_id': '22.22.22.22' - } - }, - 'r3': { - "bgp": { - 'router_id': '33.33.33.33' - } - }, + "r1": {"bgp": {"router_id": "12.12.12.12"}}, + "r2": {"bgp": {"router_id": "22.22.22.22"}}, + "r3": {"bgp": {"router_id": "33.33.33.33"}}, } result = create_router_bgp(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}".\ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Verifying router id once modified result = verify_router_id(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}".\ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Delete router id input_dict = { - 'r1': { - "bgp": { - 'del_router_id': True - } - }, - 'r2': { - "bgp": { - 'del_router_id': True - } - }, - 'r3': { - "bgp": { - 'del_router_id': True - } - }, + "r1": {"bgp": {"del_router_id": True}}, + "r2": {"bgp": {"del_router_id": True}}, + "r3": {"bgp": {"del_router_id": True}}, } result = create_router_bgp(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Verifying router id once deleted # Once router-id is deleted, highest interface ip should become # router-id result = verify_router_id(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) @@ -226,41 +243,94 @@ def test_bgp_config_with_4byte_as_number(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name write_test_header(tc_name) input_dict = { - "r1": { - "bgp": { - "local_as": 131079 - } - }, - "r2": { - "bgp": { - "local_as": 131079 - } - }, - "r3": { - "bgp": { - "local_as": 131079 - } - }, - "r4": { - "bgp": { - "local_as": 131080 - } - } + "r1": {"bgp": {"local_as": 131079}}, + "r2": {"bgp": {"local_as": 131079}}, + "r3": {"bgp": {"local_as": 131079}}, + "r4": {"bgp": {"local_as": 131080}}, + } + result = modify_as_number(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + result = verify_as_numbers(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_BGP_config_with_invalid_ASN_p2(request): + """ + Configure BGP with invalid ASN(ex - 0, reserved ASN) and verify test case + ended up with error + """ + + tgen = get_topogen() + global BGP_CONVERGENCE + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Api call to modify AS number + input_dict = { + "r1": {"bgp": {"local_as": 0,}}, + "r2": {"bgp": {"local_as": 0,}}, + "r3": {"bgp": {"local_as": 0,}}, + "r4": {"bgp": {"local_as": 64000,}}, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + + write_test_footer(tc_name) + + +def test_BGP_config_with_2byteAS_and_4byteAS_number_p1(request): + """ + Configure BGP with 4 byte and 2 byte ASN and verify BGP is converged + """ + + tgen = get_topogen() + global BGP_CONVERGENCE + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Api call to modify AS number + input_dict = { + "r1": {"bgp": {"local_as": 131079}}, + "r2": {"bgp": {"local_as": 131079}}, + "r3": {"bgp": {"local_as": 131079}}, + "r4": {"bgp": {"local_as": 111}}, } result = modify_as_number(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) result = verify_as_numbers(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) + + # Api call verify whether BGP is converged + result = verify_bgp_convergence(tgen, topo) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) write_test_footer(tc_name) @@ -272,7 +342,7 @@ def test_bgp_timers_functionality(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -290,10 +360,10 @@ def test_bgp_timers_functionality(request): "unicast": { "neighbor": { "r2": { - "dest_link":{ + "dest_link": { "r1": { - "keepalivetimer": 60, - "holddowntimer": 180, + "keepalivetimer": KEEPALIVETIMER, + "holddowntimer": HOLDDOWNTIMER, } } } @@ -305,28 +375,24 @@ def test_bgp_timers_functionality(request): } } result = create_router_bgp(tgen, topo, deepcopy(input_dict)) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Api call to clear bgp, so timer modification would take place - clear_bgp_and_verify(tgen, topo, 'r1') + clear_bgp_and_verify(tgen, topo, "r1") # Verifying bgp timers functionality result = verify_bgp_timers_and_functionality(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) - - def test_static_routes(request): """ Test to create and verify static routes. """ tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -338,17 +404,18 @@ def test_static_routes(request): # Api call to create static routes input_dict = { "r1": { - "static_routes": [{ - "network": "10.0.20.1/32", - "no_of_ip": 9, - "admin_distance": 100, - "next_hop": "10.0.0.2" - }] + "static_routes": [ + { + "network": "10.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.2", + } + ] } } result = create_static_routes(tgen, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Api call to redistribute static routes input_dict_1 = { @@ -359,7 +426,7 @@ def test_static_routes(request): "unicast": { "redistribute": [ {"redist_type": "static"}, - {"redist_type": "connected"} + {"redist_type": "connected"}, ] } } @@ -369,17 +436,16 @@ def test_static_routes(request): } result = create_router_bgp(tgen, topo, input_dict_1) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Verifying RIB routes - dut = 'r3' - protocol = 'bgp' - next_hop = ['10.0.0.2', '10.0.0.5'] - result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop, - protocol=protocol) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + dut = "r3" + protocol = "bgp" + next_hop = ["10.0.0.2", "10.0.0.5"] + result = verify_rib( + tgen, "ipv4", dut, input_dict, next_hop=next_hop, protocol=protocol + ) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) @@ -389,7 +455,7 @@ def test_admin_distance_for_existing_static_routes(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -400,21 +466,21 @@ def test_admin_distance_for_existing_static_routes(request): input_dict = { "r1": { - "static_routes": [{ - "network": "10.0.20.1/32", - "admin_distance": 10, - "next_hop": "10.0.0.2" - }] + "static_routes": [ + { + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2", + } + ] } } result = create_static_routes(tgen, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Verifying admin distance once modified result = verify_admin_distance_for_static_routes(tgen, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) @@ -424,7 +490,7 @@ def test_advertise_network_using_network_command(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -441,14 +507,8 @@ def test_advertise_network_using_network_command(request): "ipv4": { "unicast": { "advertise_networks": [ - { - "network": "20.0.0.0/32", - "no_of_network": 10 - }, - { - "network": "30.0.0.0/32", - "no_of_network": 10 - } + {"network": "20.0.0.0/32", "no_of_network": 10}, + {"network": "30.0.0.0/32", "no_of_network": 10}, ] } } @@ -458,15 +518,13 @@ def test_advertise_network_using_network_command(request): } result = create_router_bgp(tgen, topo, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Verifying RIB routes - dut = 'r2' + dut = "r2" protocol = "bgp" - result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) write_test_footer(tc_name) @@ -479,7 +537,7 @@ def test_clear_bgp_and_verify(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -489,9 +547,245 @@ def test_clear_bgp_and_verify(request): reset_config_on_routers(tgen) # clear ip bgp - result = clear_bgp_and_verify(tgen, topo, 'r1') - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + result = clear_bgp_and_verify(tgen, topo, "r1") + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_BGP_attributes_with_vrf_default_keyword_p0(request): + """ + TC_9: + Verify BGP functionality for default vrf with + "vrf default" keyword. + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # reset_config_on_routers(tgen) + + step("Configure static routes and redistribute in BGP on R3") + for addr_type in ADDR_TYPES: + input_dict = { + "r3": { + "static_routes": [ + { + "network": NETWORK[addr_type][0], + "no_of_ip": 4, + "next_hop": "Null0", + } + ] + } + } + + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + input_dict_2 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Create a route-map to match a specific prefix and modify" + "BGP attributes for matched prefix" + ) + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "ABC": [ + { + "seqid": 10, + "action": "permit", + "network": NETWORK["ipv4"][0], + } + ] + }, + "ipv6": { + "XYZ": [ + { + "seqid": 100, + "action": "permit", + "network": NETWORK["ipv6"][0], + } + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + if addr_type == "ipv4": + pf_list = "ABC" + else: + pf_list = "XYZ" + + input_dict_6 = { + "r3": { + "route_maps": { + "BGP_ATTR_{}".format(addr_type): [ + { + "action": "permit", + "seq_id": 10, + "match": {addr_type: {"prefix_lists": pf_list}}, + "set": { + "aspath": {"as_num": 500, "as_action": "prepend"}, + "localpref": 500, + "origin": "egp", + "community": {"num": "500:500", "action": "additive"}, + "large_community": { + "num": "500:500:500", + "action": "additive", + }, + }, + }, + {"action": "permit", "seq_id": 20}, + ] + }, + "BGP_ATTR_{}".format(addr_type): [ + { + "action": "permit", + "seq_id": 100, + "match": {addr_type: {"prefix_lists": pf_list}}, + "set": { + "aspath": {"as_num": 500, "as_action": "prepend"}, + "localpref": 500, + "origin": "egp", + "community": {"num": "500:500", "action": "additive"}, + "large_community": { + "num": "500:500:500", + "action": "additive", + }, + }, + }, + {"action": "permit", "seq_id": 200}, + ], + } + } + + result = create_route_maps(tgen, input_dict_6) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Apply the route-map on R3 in outbound direction for peer R4") + + input_dict_7 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "route_maps": [ + { + "name": "BGP_ATTR_ipv4", + "direction": "out", + } + ] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "route_maps": [ + { + "name": "BGP_ATTR_ipv6", + "direction": "out", + } + ] + } + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "verify modified attributes for specific prefix with 'vrf default'" + "keyword on R4" + ) + for addr_type in ADDR_TYPES: + dut = "r4" + input_dict = { + "r3": { + "static_routes": [ + { + "network": NETWORK[addr_type][0], + "vrf": "default", + "largeCommunity": "500:500:500", + } + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result) + result = verify_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + dut = "r4" + input_dict = { + "r3": { + "static_routes": [ + { + "network": NETWORK[addr_type][0], + "vrf": "default", + "community": "500:500", + } + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result) + result = verify_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result) + + input_dict_4 = {"largeCommunity": "500:500:500", "community": "500:500"} + + result = verify_bgp_community( + tgen, addr_type, dut, [NETWORK[addr_type][0]], input_dict_4 + ) + assert result is True, "Test case {} : Should fail \n Error: {}".format( + tc_name, result + ) write_test_footer(tc_name) @@ -508,7 +802,7 @@ def test_bgp_with_loopback_interface(request): tgen = get_topogen() if BGP_CONVERGENCE is not True: - pytest.skip('skipped because of BGP Convergence failure') + pytest.skip("skipped because of BGP Convergence failure") # test case name tc_name = request.node.name @@ -517,79 +811,343 @@ def test_bgp_with_loopback_interface(request): # Creating configuration from JSON reset_config_on_routers(tgen) - for routerN in sorted(topo['routers'].keys()): - for bgp_neighbor in \ - topo['routers'][routerN]['bgp']['address_family']['ipv4'][ - 'unicast']['neighbor'].keys(): + for routerN in sorted(topo["routers"].keys()): + for bgp_neighbor in topo["routers"][routerN]["bgp"]["address_family"]["ipv4"][ + "unicast" + ]["neighbor"].keys(): # Adding ['source_link'] = 'lo' key:value pair - topo['routers'][routerN]['bgp']['address_family']['ipv4'][ - 'unicast']['neighbor'][bgp_neighbor]["dest_link"] = { - 'lo': { - "source_link": "lo", - } - } + topo["routers"][routerN]["bgp"]["address_family"]["ipv4"]["unicast"][ + "neighbor" + ][bgp_neighbor]["dest_link"] = {"lo": {"source_link": "lo",}} # Creating configuration from JSON build_config_from_json(tgen, topo) input_dict = { "r1": { - "static_routes": [{ - "network": "1.0.2.17/32", - "next_hop": "10.0.0.2" - }, - { - "network": "1.0.3.17/32", - "next_hop": "10.0.0.6" - } + "static_routes": [ + {"network": "1.0.2.17/32", "next_hop": "10.0.0.2"}, + {"network": "1.0.3.17/32", "next_hop": "10.0.0.6"}, ] }, "r2": { - "static_routes": [{ - "network": "1.0.1.17/32", - "next_hop": "10.0.0.1" - }, - { - "network": "1.0.3.17/32", - "next_hop": "10.0.0.10" - } + "static_routes": [ + {"network": "1.0.1.17/32", "next_hop": "10.0.0.1"}, + {"network": "1.0.3.17/32", "next_hop": "10.0.0.10"}, ] }, "r3": { - "static_routes": [{ - "network": "1.0.1.17/32", - "next_hop": "10.0.0.5" - }, - { - "network": "1.0.2.17/32", - "next_hop": "10.0.0.9" - }, - { - "network": "1.0.4.17/32", - "next_hop": "10.0.0.14" - } + "static_routes": [ + {"network": "1.0.1.17/32", "next_hop": "10.0.0.5"}, + {"network": "1.0.2.17/32", "next_hop": "10.0.0.9"}, + {"network": "1.0.4.17/32", "next_hop": "10.0.0.14"}, ] }, - "r4": { - "static_routes": [{ - "network": "1.0.3.17/32", - "next_hop": "10.0.0.13" - }] - } + "r4": {"static_routes": [{"network": "1.0.3.17/32", "next_hop": "10.0.0.13"}]}, } result = create_static_routes(tgen, input_dict) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) # Api call verify whether BGP is converged result = verify_bgp_convergence(tgen, topo) - assert result is True, "Testcase {} :Failed \n Error: {}". \ - format(tc_name, result) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_with_loopback_with_same_subnet_p1(request): + """ + Verify routes not installed in zebra when /32 routes received + with loopback BGP session subnet + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + step("Delete BGP seesion created initially") + input_dict_r1 = { + "r1": {"bgp": {"delete": True}}, + "r2": {"bgp": {"delete": True}}, + "r3": {"bgp": {"delete": True}}, + "r4": {"bgp": {"delete": True}}, + } + result = create_router_bgp(tgen, topo, input_dict_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Create BGP session over loop address") + topo_modify = deepcopy(topo) + + for routerN in sorted(topo["routers"].keys()): + for addr_type in ADDR_TYPES: + for bgp_neighbor in topo_modify["routers"][routerN]["bgp"][ + "address_family" + ][addr_type]["unicast"]["neighbor"].keys(): + + # Adding ['source_link'] = 'lo' key:value pair + topo_modify["routers"][routerN]["bgp"]["address_family"][addr_type][ + "unicast" + ]["neighbor"][bgp_neighbor]["dest_link"] = { + "lo": {"source_link": "lo", "ebgp_multihop": 2} + } + + result = create_router_bgp(tgen, topo_modify["routers"]) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Disable IPv6 BGP nbr from ipv4 address family") + raw_config = { + "r1": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r1"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + "r2": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r2"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + "r3": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r3"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r4"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + "r4": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r4"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + } + + step("Configure kernel routes") + result = apply_raw_config(tgen, raw_config) + assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result) + + r1_ipv4_lo = topo["routers"]["r1"]["links"]["lo"]["ipv4"] + r1_ipv6_lo = topo["routers"]["r1"]["links"]["lo"]["ipv6"] + r2_ipv4_lo = topo["routers"]["r2"]["links"]["lo"]["ipv4"] + r2_ipv6_lo = topo["routers"]["r2"]["links"]["lo"]["ipv6"] + r3_ipv4_lo = topo["routers"]["r3"]["links"]["lo"]["ipv4"] + r3_ipv6_lo = topo["routers"]["r3"]["links"]["lo"]["ipv6"] + r4_ipv4_lo = topo["routers"]["r4"]["links"]["lo"]["ipv4"] + r4_ipv6_lo = topo["routers"]["r4"]["links"]["lo"]["ipv6"] + + r1_r2 = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0] + r2_r1 = topo["routers"]["r2"]["links"]["r1"]["ipv6"].split("/")[0] + r1_r3 = topo["routers"]["r1"]["links"]["r3"]["ipv6"].split("/")[0] + r3_r1 = topo["routers"]["r3"]["links"]["r1"]["ipv6"].split("/")[0] + r2_r3 = topo["routers"]["r2"]["links"]["r3"]["ipv6"].split("/")[0] + r3_r2 = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0] + r3_r4 = topo["routers"]["r3"]["links"]["r4"]["ipv6"].split("/")[0] + r4_r3 = topo["routers"]["r4"]["links"]["r3"]["ipv6"].split("/")[0] + + r1_r2_ipv4 = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + r2_r1_ipv4 = topo["routers"]["r2"]["links"]["r1"]["ipv4"].split("/")[0] + r1_r3_ipv4 = topo["routers"]["r1"]["links"]["r3"]["ipv4"].split("/")[0] + r3_r1_ipv4 = topo["routers"]["r3"]["links"]["r1"]["ipv4"].split("/")[0] + r2_r3_ipv4 = topo["routers"]["r2"]["links"]["r3"]["ipv4"].split("/")[0] + r3_r2_ipv4 = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + r3_r4_ipv4 = topo["routers"]["r3"]["links"]["r4"]["ipv4"].split("/")[0] + r4_r3_ipv4 = topo["routers"]["r4"]["links"]["r3"]["ipv4"].split("/")[0] + + r1_r2_intf = topo["routers"]["r1"]["links"]["r2"]["interface"] + r2_r1_intf = topo["routers"]["r2"]["links"]["r1"]["interface"] + r1_r3_intf = topo["routers"]["r1"]["links"]["r3"]["interface"] + r3_r1_intf = topo["routers"]["r3"]["links"]["r1"]["interface"] + r2_r3_intf = topo["routers"]["r2"]["links"]["r3"]["interface"] + r3_r2_intf = topo["routers"]["r3"]["links"]["r2"]["interface"] + r3_r4_intf = topo["routers"]["r3"]["links"]["r4"]["interface"] + r4_r3_intf = topo["routers"]["r4"]["links"]["r3"]["interface"] + + ipv4_list = [ + ("r1", r1_r2_intf, r2_ipv4_loopback), + ("r1", r1_r3_intf, r3_ipv4_loopback), + ("r2", r2_r1_intf, r1_ipv4_loopback), + ("r2", r2_r3_intf, r3_ipv4_loopback), + ("r3", r3_r1_intf, r1_ipv4_loopback), + ("r3", r3_r2_intf, r2_ipv4_loopback), + ("r3", r3_r4_intf, r4_ipv4_loopback), + ("r4", r4_r3_intf, r3_ipv4_loopback), + ] + + ipv6_list = [ + ("r1", r1_r2_intf, r2_ipv6_loopback, r2_r1), + ("r1", r1_r3_intf, r3_ipv6_loopback, r3_r1), + ("r2", r2_r1_intf, r1_ipv6_loopback, r1_r2), + ("r2", r2_r3_intf, r3_ipv6_loopback, r3_r2), + ("r3", r3_r1_intf, r1_ipv6_loopback, r1_r3), + ("r3", r3_r2_intf, r2_ipv6_loopback, r2_r3), + ("r3", r3_r4_intf, r4_ipv6_loopback, r4_r3), + ("r4", r4_r3_intf, r3_ipv6_loopback, r3_r4), + ] + + for dut, intf, loop_addr in ipv4_list: + result = addKernelRoute(tgen, dut, intf, loop_addr) + assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result) + + for dut, intf, loop_addr, next_hop in ipv6_list: + result = addKernelRoute(tgen, dut, intf, loop_addr, next_hop) + assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result) + + step("Configure static routes") + + input_dict = { + "r1": { + "static_routes": [ + {"network": r2_ipv4_loopback, "next_hop": r2_r1_ipv4}, + {"network": r3_ipv4_loopback, "next_hop": r3_r1_ipv4}, + {"network": r2_ipv6_loopback, "next_hop": r2_r1}, + {"network": r3_ipv6_loopback, "next_hop": r3_r1}, + ] + }, + "r2": { + "static_routes": [ + {"network": r1_ipv4_loopback, "next_hop": r1_r2_ipv4}, + {"network": r3_ipv4_loopback, "next_hop": r3_r2_ipv4}, + {"network": r1_ipv6_loopback, "next_hop": r1_r2}, + {"network": r3_ipv6_loopback, "next_hop": r3_r2}, + ] + }, + "r3": { + "static_routes": [ + {"network": r1_ipv4_loopback, "next_hop": r1_r3_ipv4}, + {"network": r2_ipv4_loopback, "next_hop": r2_r3_ipv4}, + {"network": r4_ipv4_loopback, "next_hop": r4_r3_ipv4}, + {"network": r1_ipv6_loopback, "next_hop": r1_r3}, + {"network": r2_ipv6_loopback, "next_hop": r2_r3}, + {"network": r4_ipv6_loopback, "next_hop": r4_r3}, + ] + }, + "r4": { + "static_routes": [ + {"network": r3_ipv4_loopback, "next_hop": r3_r4_ipv4}, + {"network": r3_ipv6_loopback, "next_hop": r3_r4}, + ] + }, + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Verify BGP session convergence") + + result = verify_bgp_convergence(tgen, topo_modify) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Configure redistribute connected on R2 and R4") + input_dict_1 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + "ipv6": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + } + } + }, + "r4": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + "ipv6": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + } + } + }, + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Verify Ipv4 and Ipv6 network installed in R1 RIB but not in FIB") + input_dict_r1 = { + "r1": { + "static_routes": [ + {"network": "1.0.2.17/32"}, + {"network": "2001:db8:f::2:17/128"}, + ] + } + } + + dut = "r1" + protocol = "bgp" + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict_r1, protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_fib_routes(tgen, addr_type, dut, input_dict_r1, expected=False) + assert result is not True, "Testcase {} : Failed \n" + "Expected behavior: routes should not present in fib \n" + "Error: {}".format(tc_name, result) + + step("Verify Ipv4 and Ipv6 network installed in r3 RIB but not in FIB") + input_dict_r3 = { + "r3": { + "static_routes": [ + {"network": "1.0.4.17/32"}, + {"network": "2001:db8:f::4:17/128"}, + ] + } + } + dut = "r3" + protocol = "bgp" + for addr_type in ADDR_TYPES: + result = verify_rib( + tgen, addr_type, dut, input_dict_r3, protocol=protocol, fib=None + ) + assert result is True, "Testcase {} :Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_fib_routes(tgen, addr_type, dut, input_dict_r1, expected=False) + assert result is not True, "Testcase {} : Failed \n" + "Expected behavior: routes should not present in fib \n" + "Error: {}".format(tc_name, result) write_test_footer(tc_name) -if __name__ == '__main__': +if __name__ == "__main__": args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args)) |
