diff options
| -rw-r--r-- | tests/topotests/bgp-path-attributes-topo1/__init__.py | 0 | ||||
| -rw-r--r-- | tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json | 220 | ||||
| -rwxr-xr-x | tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py | 1078 | ||||
| -rw-r--r-- | tests/topotests/lib/bgp.py | 264 |
4 files changed, 1562 insertions, 0 deletions
diff --git a/tests/topotests/bgp-path-attributes-topo1/__init__.py b/tests/topotests/bgp-path-attributes-topo1/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/bgp-path-attributes-topo1/__init__.py diff --git a/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json new file mode 100644 index 0000000000..15b7ec13be --- /dev/null +++ b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json @@ -0,0 +1,220 @@ +{ + "ipv4base":"10.0.0.0", + "ipv4mask":30, + "ipv6base":"fd00::", + "ipv6mask":64, + "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64}, + "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128}, + "routers":{ + "r1":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"555", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1": {"ipv4": "auto", "ipv6": "auto"}, + "r3": {"ipv4": "auto", "ipv6": "auto"}, + "r4-link1": {"ipv4": "auto", "ipv6": "auto"}, + "r4-link2": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"555", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + }, + "r4": { + "dest_link": { + "r2-link1": {} + } + } + } + } + } + } + } + }, + "r3":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1":{"ipv4":"auto", "ipv6":"auto"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r5":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"555", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r5": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r4":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2-link1": {"ipv4": "auto", "ipv6": "auto"}, + "r2-link2": {"ipv4": "auto", "ipv6": "auto"}, + "r6": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp": { + "local_as": "666", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r4-link1": {} + } + }, + "r6": { + "dest_link": { + "r4": {} + } + } + } + } + } + } + } + }, + "r5":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r3": {"ipv4": "auto", "ipv6": "auto"}, + "r7": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"666", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r5": {} + } + }, + "r7": { + "dest_link": { + "r5": {} + } + } + } + } + } + } + } + }, + "r6":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r4": {"ipv4": "auto", "ipv6": "auto"}, + "r7": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"777", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r6": {} + } + }, + "r7": { + "dest_link": { + "r6": {} + } + } + } + } + } + } + } + }, + "r7":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r5": {"ipv4": "auto", "ipv6": "auto"}, + "r6": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"888", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r5": { + "dest_link": { + "r7": {} + } + }, + "r6": { + "dest_link": { + "r7": {} + } + } + } + } + } + } + } + } + } +} diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py new file mode 100755 index 0000000000..7400d9ee54 --- /dev/null +++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py @@ -0,0 +1,1078 @@ +#!/usr/bin/env python + +# +# Modified work Copyright (c) 2019 by VMware, Inc. ("VMware") +# Original work Copyright (c) 2018 by Network Device Education +# Foundation, Inc. ("NetDEF") +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test AS-Path functionality: + +Setup module: +- Create topology (setup module) +- Bring up topology +- Verify BGP convergence + +Test cases: +1. Test next_hop attribute and verify best path is installed as per + reachable next_hop +2. Test aspath attribute and verify best path is installed as per + shortest AS-Path +3. Test localpref attribute and verify best path is installed as per + shortest local-preference +4. Test weight attribute and and verify best path is installed as per + highest weight +5. Test origin attribute and verify best path is installed as per + IGP>EGP>INCOMPLETE rule +6. Test med attribute and verify best path is installed as per lowest + med value +7. Test admin distance and verify best path is installed as per lowest + admin distance + +Teardown module: +- Bring down the topology +- stop routers + +""" + +import os +import sys +import pdb +import json +import time +import inspect +import ipaddress +from time import sleep +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from mininet.topo import Topo +from lib import topotest +from lib.topogen import Topogen, TopoRouter, get_topogen + +# Required to instantiate the topology builder class. +from lib.common_config import ( + start_topology, stop_topology, write_test_header, + write_test_footer, reset_config_on_routers, + verify_rib, create_static_routes, + create_prefix_lists, verify_prefix_lists, + create_route_maps +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, create_router_bgp, + clear_bgp_and_verify, verify_best_path_as_per_bgp_attribute, + verify_best_path_as_per_admin_distance +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology creation +jsonFile = "{}/bgp_path_attributes.json".format(CWD) + +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + + +#### +class CreateTopo(Topo): + """ + Test CreateTopo - topology 1 + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # Building topology and configuration from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: %s", testsuite_run_time) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(CreateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Checking BGP convergence + result = verify_bgp_convergence(tgen, topo) + assert result is True, ("setup_module :Failed \n Error:" + " {}".format(result)) + + logger.info("Running setup_module() done") + + +def teardown_module(): + """ + Teardown the pytest environment + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + stop_topology(tgen) + + logger.info("Testsuite end time: %s", + time.asctime(time.localtime(time.time()))) + logger.info("=" * 40) + + +##################################################### +## +## Testcases +## +##################################################### +def test_next_hop_attribute(request): + """ + Verifying route are not getting installed in, as next_hop is + unreachable, Making next hop reachable using next_hop_self + command and verifying routes are installed. + """ + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp":{ + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r1" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: %s", result) + + # Configure next-hop-self to bgp neighbor + input_dict_1 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r1" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_aspath_attribute(request): + " Verifying AS_PATH attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "aspath" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_localpref_attribute(request): + " Verifying LOCAL PREFERENCE attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Prefix list + input_dict_2 = { + "r2": { + "prefix_lists": { + "ipv4": { + "pf_ls_1": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r2": { + "route_maps": { + "RMAP_LOCAL_PREF": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_1" + } + }, + "set": { + "localpref": 1000 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": "RMAP_LOCAL_PREF", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "localpref" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_weight_attribute(request): + " Verifying WEIGHT attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_ls_1": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r1": { + "route_maps": { + "RMAP_WEIGHT": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_1" + } + }, + "set": { + "weight": 500 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": { + "route_maps": [ + {"name": "RMAP_WEIGHT", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "weight" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_origin_attribute(request): + " Verifying ORIGIN attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r4": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r5": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to create static routes + input_dict_3 = { + "r5": { + "static_routes": [ + { + "network": "200.50.2.0/32", + "next_hop": "10.0.0.26" + }, + { + "network": "200.60.2.0/32", + "next_hop": "10.0.0.26" + } + ] + } + } + result = create_static_routes(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Configure next-hop-self to bgp neighbor + + # Verifying best path + dut = "r1" + attribute = "origin" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_med_attribute(request): + " Verifying MED attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r4": { + "bgp":{ + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to advertise networks + + + # Configure next-hop-self to bgp neighbor + + + # Create Prefix list + input_dict_3 = { + "r2": { + "prefix_lists": { + "ipv4": { + "pf_ls_r2": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + }, + "r3": { + "prefix_lists": { + "ipv4": { + "pf_ls_r3": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r2": { + "route_maps": { + "RMAP_MED_R2": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_r2" + } + }, + "set": { + "med": 100 + } + }] + } + }, + "r3": { + "route_maps": { + "RMAP_MED_R3": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_r3" + } + }, + "set": { + "med": 10 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r5": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r2-link1": { + "route_maps": [ + {"name": "RMAP_MED_R2", + "direction": "in"} + ] + } + } + }, + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + }, + "r5": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": "RMAP_MED_R3", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "med" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + logger.info("Testcase %s :Passed \n", tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_admin_distance(request): + " Verifying admin distance functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to create static routes + input_dict = { + "r2": { + "static_routes": [ + { + "network": "200.50.2.0/32", + "admin_distance": 80, + "next_hop": "10.0.0.14" + }, + { + "network": "200.50.2.0/32", + "admin_distance": 60, + "next_hop": "10.0.0.18" + } + ] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_2 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "admin_distance" + result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index a4b65cb987..34b48eed12 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -1256,3 +1256,267 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): logger.info("Exiting lib API: verify_bgp_timers_and_functionality()") return True + +def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, + attribute): + """ + API is to verify best path according to BGP attributes for given routes. + "show bgp ipv4/6 json" command will be run and verify best path according + to shortest as-path, highest local-preference and med, lowest weight and + route origin IGP>EGP>INCOMPLETE. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `tgen` : topogen object + * `attribute` : calculate best path using this attribute + * `input_dict`: defines different routes to calculate for which route + best path is selected + + Usage + ----- + # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from + router r7 to router r1(DUT) as per shortest as-path attribute + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + attribute = "localpref" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \ + input_dict, attribute) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()") + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + # TODO get addr_type from address + # Verifying show bgp json + command = "show bgp {} json".format(addr_type) + + sleep(2) + logger.info("Verifying router %s RIB for best path:", router) + sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True) + + for route_val in input_dict.values(): + net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"] + networks = net_data["advertise_networks"] + for network in networks: + route = network["network"] + + route_attributes = sh_ip_bgp_json["routes"][route] + _next_hop = None + compare = None + attribute_dict = {} + for route_attribute in route_attributes: + next_hops = route_attribute["nexthops"] + for next_hop in next_hops: + next_hop_ip = next_hop["ip"] + attribute_dict[next_hop_ip] = route_attribute[attribute] + + # AS_PATH attribute + if attribute == "aspath": + # Find next_hop for the route have minimum as_path + _next_hop = min(attribute_dict, key=lambda x: len(set( + attribute_dict[x]))) + compare = "SHORTEST" + + # LOCAL_PREF attribute + elif attribute == "localpref": + # Find next_hop for the route have highest local preference + _next_hop = max(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "HIGHEST" + + # WEIGHT attribute + elif attribute == "weight": + # Find next_hop for the route have highest weight + _next_hop = max(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "HIGHEST" + + # ORIGIN attribute + elif attribute == "origin": + # Find next_hop for the route have IGP as origin, - + # - rule is IGP>EGP>INCOMPLETE + _next_hop = [key for (key, value) in + attribute_dict.iteritems() + if value == "IGP"][0] + compare = "" + + # MED attribute + elif attribute == "med": + # Find next_hop for the route have LOWEST MED + _next_hop = min(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "LOWEST" + + # Show ip route + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if not bool(rib_routes_json): + errormsg = "No route found in RIB of router {}..". \ + format(router) + return errormsg + + st_found = False + nh_found = False + # Find best is installed in RIB + if route in rib_routes_json: + st_found = True + # Verify next_hop in rib_routes_json + if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ + _next_hop: + nh_found = True + else: + errormsg = "Incorrect Nexthop for BGP route {} in " \ + "RIB of router {}, Expected: {}, Found:" \ + " {}\n".format(route, router, + rib_routes_json[route][0][ + "nexthops"][0]["ip"], + _next_hop) + return errormsg + + if st_found and nh_found: + logger.info( + "Best path for prefix: %s with next_hop: %s is " + "installed according to %s %s: (%s) in RIB of " + "router %s", route, _next_hop, compare, + attribute, attribute_dict[_next_hop], router) + + logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()") + return True + + +def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, + attribute): + """ + API is to verify best path according to admin distance for given + route. "show ip/ipv6 route json" command will be run and verify + best path accoring to shortest admin distanc. + + Parameters + ---------- + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `tgen` : topogen object + * `attribute` : calculate best path using admin distance + * `input_dict`: defines different routes with different admin distance + to calculate for which route best path is selected + Usage + ----- + # To verify best path for route 200.50.2.0/32 from router r2 to + router r1(DUT) as per shortest admin distance which is 60. + input_dict = { + "r2": { + "static_routes": [{"network": "200.50.2.0/32", \ + "admin_distance": 80, "next_hop": "10.0.0.14"}, + {"network": "200.50.2.0/32", \ + "admin_distance": 60, "next_hop": "10.0.0.18"}] + }} + attribute = "localpref" + result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \ + input_dict, attribute): + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_best_path_as_per_admin_distance()") + router_list = tgen.routers() + if router not in router_list: + return False + + rnode = tgen.routers()[router] + + sleep(2) + logger.info("Verifying router %s RIB for best path:", router) + + # Show ip route cmd + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + + for routes_from_router in input_dict.keys(): + sh_ip_route_json = router_list[routes_from_router].vtysh_cmd( + command, isjson=True) + networks = input_dict[routes_from_router]["static_routes"] + for network in networks: + route = network["network"] + + route_attributes = sh_ip_route_json[route] + _next_hop = None + compare = None + attribute_dict = {} + for route_attribute in route_attributes: + next_hops = route_attribute["nexthops"] + for next_hop in next_hops: + next_hop_ip = next_hop["ip"] + attribute_dict[next_hop_ip] = route_attribute["distance"] + + # Find next_hop for the route have LOWEST Admin Distance + _next_hop = min(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "LOWEST" + + # Show ip route + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if not bool(rib_routes_json): + errormsg = "No route found in RIB of router {}..".format(router) + return errormsg + + st_found = False + nh_found = False + # Find best is installed in RIB + if route in rib_routes_json: + st_found = True + # Verify next_hop in rib_routes_json + if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ + _next_hop: + nh_found = True + else: + errormsg = ("Nexthop {} is Missing for BGP route {}" + " in RIB of router {}\n".format(_next_hop, + route, router)) + return errormsg + + if st_found and nh_found: + logger.info("Best path for prefix: %s is installed according" + " to %s %s: (%s) in RIB of router %s", route, + compare, attribute, + attribute_dict[_next_hop], router) + + logger.info( + "Exiting lib API: verify_best_path_as_per_admin_distance()") + return True |
