summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/topotests/bgp-path-attributes-topo1/__init__.py0
-rw-r--r--tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json220
-rwxr-xr-xtests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py1078
-rw-r--r--tests/topotests/lib/bgp.py264
4 files changed, 1562 insertions, 0 deletions
diff --git a/tests/topotests/bgp-path-attributes-topo1/__init__.py b/tests/topotests/bgp-path-attributes-topo1/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/bgp-path-attributes-topo1/__init__.py
diff --git a/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json
new file mode 100644
index 0000000000..15b7ec13be
--- /dev/null
+++ b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json
@@ -0,0 +1,220 @@
+{
+ "ipv4base":"10.0.0.0",
+ "ipv4mask":30,
+ "ipv6base":"fd00::",
+ "ipv6mask":64,
+ "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64},
+ "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128},
+ "routers":{
+ "r1":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r2":{"ipv4":"auto", "ipv6":"auto"},
+ "r3":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"555",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r1": {"ipv4": "auto", "ipv6": "auto"},
+ "r3": {"ipv4": "auto", "ipv6": "auto"},
+ "r4-link1": {"ipv4": "auto", "ipv6": "auto"},
+ "r4-link2": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"555",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r2-link1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r1":{"ipv4":"auto", "ipv6":"auto"},
+ "r2":{"ipv4":"auto", "ipv6":"auto"},
+ "r5":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"555",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r5": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r4":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r2-link1": {"ipv4": "auto", "ipv6": "auto"},
+ "r2-link2": {"ipv4": "auto", "ipv6": "auto"},
+ "r6": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp": {
+ "local_as": "666",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r4-link1": {}
+ }
+ },
+ "r6": {
+ "dest_link": {
+ "r4": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r5":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r3": {"ipv4": "auto", "ipv6": "auto"},
+ "r7": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"666",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r5": {}
+ }
+ },
+ "r7": {
+ "dest_link": {
+ "r5": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r6":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r4": {"ipv4": "auto", "ipv6": "auto"},
+ "r7": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"777",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r6": {}
+ }
+ },
+ "r7": {
+ "dest_link": {
+ "r6": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r7":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r5": {"ipv4": "auto", "ipv6": "auto"},
+ "r6": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"888",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r5": {
+ "dest_link": {
+ "r7": {}
+ }
+ },
+ "r6": {
+ "dest_link": {
+ "r7": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
new file mode 100755
index 0000000000..7400d9ee54
--- /dev/null
+++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
@@ -0,0 +1,1078 @@
+#!/usr/bin/env python
+
+#
+# Modified work Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Original work Copyright (c) 2018 by Network Device Education
+# Foundation, Inc. ("NetDEF")
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test AS-Path functionality:
+
+Setup module:
+- Create topology (setup module)
+- Bring up topology
+- Verify BGP convergence
+
+Test cases:
+1. Test next_hop attribute and verify best path is installed as per
+ reachable next_hop
+2. Test aspath attribute and verify best path is installed as per
+ shortest AS-Path
+3. Test localpref attribute and verify best path is installed as per
+ shortest local-preference
+4. Test weight attribute and and verify best path is installed as per
+ highest weight
+5. Test origin attribute and verify best path is installed as per
+ IGP>EGP>INCOMPLETE rule
+6. Test med attribute and verify best path is installed as per lowest
+ med value
+7. Test admin distance and verify best path is installed as per lowest
+ admin distance
+
+Teardown module:
+- Bring down the topology
+- stop routers
+
+"""
+
+import os
+import sys
+import pdb
+import json
+import time
+import inspect
+import ipaddress
+from time import sleep
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from mininet.topo import Topo
+from lib import topotest
+from lib.topogen import Topogen, TopoRouter, get_topogen
+
+# Required to instantiate the topology builder class.
+from lib.common_config import (
+ start_topology, stop_topology, write_test_header,
+ write_test_footer, reset_config_on_routers,
+ verify_rib, create_static_routes,
+ create_prefix_lists, verify_prefix_lists,
+ create_route_maps
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence, create_router_bgp,
+ clear_bgp_and_verify, verify_best_path_as_per_bgp_attribute,
+ verify_best_path_as_per_admin_distance
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_path_attributes.json".format(CWD)
+
+try:
+ with open(jsonFile, "r") as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+
+####
+class CreateTopo(Topo):
+ """
+ Test CreateTopo - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # Building topology and configuration from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: %s", testsuite_run_time)
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(CreateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Checking BGP convergence
+ result = verify_bgp_convergence(tgen, topo)
+ assert result is True, ("setup_module :Failed \n Error:"
+ " {}".format(result))
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """
+ Teardown the pytest environment
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ stop_topology(tgen)
+
+ logger.info("Testsuite end time: %s",
+ time.asctime(time.localtime(time.time())))
+ logger.info("=" * 40)
+
+
+#####################################################
+##
+## Testcases
+##
+#####################################################
+def test_next_hop_attribute(request):
+ """
+ Verifying route are not getting installed in, as next_hop is
+ unreachable, Making next hop reachable using next_hop_self
+ command and verifying routes are installed.
+ """
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp":{
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r1"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: %s", result)
+
+ # Configure next-hop-self to bgp neighbor
+ input_dict_1 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r1"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_aspath_attribute(request):
+ " Verifying AS_PATH attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "aspath"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_localpref_attribute(request):
+ " Verifying LOCAL PREFERENCE attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Prefix list
+ input_dict_2 = {
+ "r2": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_1": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create route map
+ input_dict_3 = {
+ "r2": {
+ "route_maps": {
+ "RMAP_LOCAL_PREF": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_1"
+ }
+ },
+ "set": {
+ "localpref": 1000
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {"name": "RMAP_LOCAL_PREF",
+ "direction": "in"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "localpref"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_weight_attribute(request):
+ " Verifying WEIGHT attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Prefix list
+ input_dict_2 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_1": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create route map
+ input_dict_3 = {
+ "r1": {
+ "route_maps": {
+ "RMAP_WEIGHT": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_1"
+ }
+ },
+ "set": {
+ "weight": 500
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {
+ "route_maps": [
+ {"name": "RMAP_WEIGHT",
+ "direction": "in"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "weight"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_origin_attribute(request):
+ " Verifying ORIGIN attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r5": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to create static routes
+ input_dict_3 = {
+ "r5": {
+ "static_routes": [
+ {
+ "network": "200.50.2.0/32",
+ "next_hop": "10.0.0.26"
+ },
+ {
+ "network": "200.60.2.0/32",
+ "next_hop": "10.0.0.26"
+ }
+ ]
+ }
+ }
+ result = create_static_routes(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Configure next-hop-self to bgp neighbor
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "origin"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_med_attribute(request):
+ " Verifying MED attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r4": {
+ "bgp":{
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to advertise networks
+
+
+ # Configure next-hop-self to bgp neighbor
+
+
+ # Create Prefix list
+ input_dict_3 = {
+ "r2": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_r2": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ },
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_r3": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create route map
+ input_dict_3 = {
+ "r2": {
+ "route_maps": {
+ "RMAP_MED_R2": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_r2"
+ }
+ },
+ "set": {
+ "med": 100
+ }
+ }]
+ }
+ },
+ "r3": {
+ "route_maps": {
+ "RMAP_MED_R3": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_r3"
+ }
+ },
+ "set": {
+ "med": 10
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r5": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r2-link1": {
+ "route_maps": [
+ {"name": "RMAP_MED_R2",
+ "direction": "in"}
+ ]
+ }
+ }
+ },
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ },
+ "r5": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {"name": "RMAP_MED_R3",
+ "direction": "in"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "med"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ logger.info("Testcase %s :Passed \n", tc_name)
+
+ # Uncomment next line for debugging
+ # tgen.mininet_cli()
+
+
+def test_admin_distance(request):
+ " Verifying admin distance functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to create static routes
+ input_dict = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": "200.50.2.0/32",
+ "admin_distance": 80,
+ "next_hop": "10.0.0.14"
+ },
+ {
+ "network": "200.50.2.0/32",
+ "admin_distance": 60,
+ "next_hop": "10.0.0.18"
+ }
+ ]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+ input_dict_2 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "admin_distance"
+ result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index a4b65cb987..34b48eed12 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -1256,3 +1256,267 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
return True
+
+def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
+ attribute):
+ """
+ API is to verify best path according to BGP attributes for given routes.
+ "show bgp ipv4/6 json" command will be run and verify best path according
+ to shortest as-path, highest local-preference and med, lowest weight and
+ route origin IGP>EGP>INCOMPLETE.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `tgen` : topogen object
+ * `attribute` : calculate best path using this attribute
+ * `input_dict`: defines different routes to calculate for which route
+ best path is selected
+
+ Usage
+ -----
+ # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
+ router r7 to router r1(DUT) as per shortest as-path attribute
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ attribute = "localpref"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
+ input_dict, attribute)
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ # TODO get addr_type from address
+ # Verifying show bgp json
+ command = "show bgp {} json".format(addr_type)
+
+ sleep(2)
+ logger.info("Verifying router %s RIB for best path:", router)
+ sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True)
+
+ for route_val in input_dict.values():
+ net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"]
+ networks = net_data["advertise_networks"]
+ for network in networks:
+ route = network["network"]
+
+ route_attributes = sh_ip_bgp_json["routes"][route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute[attribute]
+
+ # AS_PATH attribute
+ if attribute == "aspath":
+ # Find next_hop for the route have minimum as_path
+ _next_hop = min(attribute_dict, key=lambda x: len(set(
+ attribute_dict[x])))
+ compare = "SHORTEST"
+
+ # LOCAL_PREF attribute
+ elif attribute == "localpref":
+ # Find next_hop for the route have highest local preference
+ _next_hop = max(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "HIGHEST"
+
+ # WEIGHT attribute
+ elif attribute == "weight":
+ # Find next_hop for the route have highest weight
+ _next_hop = max(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "HIGHEST"
+
+ # ORIGIN attribute
+ elif attribute == "origin":
+ # Find next_hop for the route have IGP as origin, -
+ # - rule is IGP>EGP>INCOMPLETE
+ _next_hop = [key for (key, value) in
+ attribute_dict.iteritems()
+ if value == "IGP"][0]
+ compare = ""
+
+ # MED attribute
+ elif attribute == "med":
+ # Find next_hop for the route have LOWEST MED
+ _next_hop = min(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "LOWEST"
+
+ # Show ip route
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..". \
+ format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
+ _next_hop:
+ nh_found = True
+ else:
+ errormsg = "Incorrect Nexthop for BGP route {} in " \
+ "RIB of router {}, Expected: {}, Found:" \
+ " {}\n".format(route, router,
+ rib_routes_json[route][0][
+ "nexthops"][0]["ip"],
+ _next_hop)
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info(
+ "Best path for prefix: %s with next_hop: %s is "
+ "installed according to %s %s: (%s) in RIB of "
+ "router %s", route, _next_hop, compare,
+ attribute, attribute_dict[_next_hop], router)
+
+ logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()")
+ return True
+
+
+def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
+ attribute):
+ """
+ API is to verify best path according to admin distance for given
+ route. "show ip/ipv6 route json" command will be run and verify
+ best path accoring to shortest admin distanc.
+
+ Parameters
+ ----------
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `tgen` : topogen object
+ * `attribute` : calculate best path using admin distance
+ * `input_dict`: defines different routes with different admin distance
+ to calculate for which route best path is selected
+ Usage
+ -----
+ # To verify best path for route 200.50.2.0/32 from router r2 to
+ router r1(DUT) as per shortest admin distance which is 60.
+ input_dict = {
+ "r2": {
+ "static_routes": [{"network": "200.50.2.0/32", \
+ "admin_distance": 80, "next_hop": "10.0.0.14"},
+ {"network": "200.50.2.0/32", \
+ "admin_distance": 60, "next_hop": "10.0.0.18"}]
+ }}
+ attribute = "localpref"
+ result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
+ input_dict, attribute):
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_best_path_as_per_admin_distance()")
+ router_list = tgen.routers()
+ if router not in router_list:
+ return False
+
+ rnode = tgen.routers()[router]
+
+ sleep(2)
+ logger.info("Verifying router %s RIB for best path:", router)
+
+ # Show ip route cmd
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+
+ for routes_from_router in input_dict.keys():
+ sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
+ command, isjson=True)
+ networks = input_dict[routes_from_router]["static_routes"]
+ for network in networks:
+ route = network["network"]
+
+ route_attributes = sh_ip_route_json[route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute["distance"]
+
+ # Find next_hop for the route have LOWEST Admin Distance
+ _next_hop = min(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "LOWEST"
+
+ # Show ip route
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..".format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
+ _next_hop:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for BGP route {}"
+ " in RIB of router {}\n".format(_next_hop,
+ route, router))
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info("Best path for prefix: %s is installed according"
+ " to %s %s: (%s) in RIB of router %s", route,
+ compare, attribute,
+ attribute_dict[_next_hop], router)
+
+ logger.info(
+ "Exiting lib API: verify_best_path_as_per_admin_distance()")
+ return True