]> git.puffer.fish Git - matthieu/frr.git/commitdiff
tests: Adding 5 test cases to bgp-basic suite
authorAshish Pant <ashish12pant@gmail.com>
Tue, 25 Jun 2019 01:03:09 +0000 (06:33 +0530)
committerAshish Pant <ashish12pant@gmail.com>
Tue, 9 Jul 2019 04:56:53 +0000 (10:26 +0530)
Signed-off-by: Ashish Pant <ashish12pant@gmail.com>
Adding test cases and verfication API for new test cases

tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
tests/topotests/lib/bgp.py
tests/topotests/lib/common_config.py

index 040b2f4a439b7a7dfd88d1a0178526f8d78077ec..1d6ea0cda00dc2b726e5705d5722ad07ccdad821 100755 (executable)
@@ -53,7 +53,8 @@ from mininet.topo import Topo
 
 from lib.common_config import (
     start_topology, stop_topology, write_test_header,
-    write_test_footer, reset_config_on_routers
+    write_test_footer, reset_config_on_routers, create_static_routes,
+    verify_rib, verify_admin_distance_for_static_routes
 )
 from lib.topolog import logger
 from lib.bgp import (
@@ -311,6 +312,277 @@ def test_bgp_timers_functionality(request):
     write_test_footer(tc_name)
 
 
+
+
+def test_static_routes(request):
+    """ Test to create and verify static routes. """
+
+    tgen = get_topogen()
+    if BGP_CONVERGENCE is not True:
+        pytest.skip('skipped because of BGP Convergence failure')
+
+    # test case name
+    tc_name = request.node.name
+    write_test_header(tc_name)
+
+    # Creating configuration from JSON
+    reset_config_on_routers(tgen)
+
+    # Api call to create static routes
+    input_dict = {
+        "r1": {
+            "static_routes": [{
+                "network": "10.0.20.1/32",
+                "no_of_ip": 9,
+                "admin_distance": 100,
+                "next_hop": "10.0.0.2"
+            }]
+        }
+    }
+    result = create_static_routes(tgen, input_dict)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    # Api call to redistribute static routes
+    input_dict_1 = {
+        "r1": {
+            "bgp": {
+                "address_family": {
+                    "ipv4": {
+                        "unicast": {
+                            "redistribute": [
+                                {"redist_type": "static"},
+                                {"redist_type": "connected"}
+                            ]
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    result = create_router_bgp(tgen, topo, input_dict_1)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    # Verifying RIB routes
+    dut = 'r3'
+    protocol = 'bgp'
+    next_hop = '10.0.0.2'
+    result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop,
+                        protocol=protocol)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    write_test_footer(tc_name)
+
+
+def test_admin_distance_for_existing_static_routes(request):
+    """ Test to modify and verify admin distance for existing static routes."""
+
+    tgen = get_topogen()
+    if BGP_CONVERGENCE is not True:
+        pytest.skip('skipped because of BGP Convergence failure')
+
+    # test case name
+    tc_name = request.node.name
+    write_test_header(tc_name)
+
+    # Creating configuration from JSON
+    reset_config_on_routers(tgen)
+
+    input_dict = {
+        "r1": {
+            "static_routes": [{
+                "network": "10.0.20.1/32",
+                "admin_distance": 10,
+                "next_hop": "10.0.0.2"
+            }]
+        }
+    }
+    result = create_static_routes(tgen, input_dict)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    # Verifying admin distance  once modified
+    result = verify_admin_distance_for_static_routes(tgen, input_dict)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    write_test_footer(tc_name)
+
+
+def test_advertise_network_using_network_command(request):
+    """ Test advertise networks using network command."""
+
+    tgen = get_topogen()
+    if BGP_CONVERGENCE is not True:
+        pytest.skip('skipped because of BGP Convergence failure')
+
+    # test case name
+    tc_name = request.node.name
+    write_test_header(tc_name)
+
+    # Creating configuration from JSON
+    reset_config_on_routers(tgen)
+
+    # Api call to advertise networks
+    input_dict = {
+        "r1": {
+            "bgp": {
+                "address_family": {
+                    "ipv4": {
+                        "unicast": {
+                            "advertise_networks": [
+                                {
+                                    "network": "20.0.0.0/32",
+                                    "no_of_network": 10
+                                },
+                                {
+                                    "network": "30.0.0.0/32",
+                                    "no_of_network": 10
+                                }
+                            ]
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    result = create_router_bgp(tgen, topo, input_dict)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    # Verifying RIB routes
+    dut = 'r2'
+    protocol = "bgp"
+    result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    write_test_footer(tc_name)
+
+
+def test_clear_bgp_and_verify(request):
+    """
+    Created few static routes and verified all routes are learned via BGP
+    cleared BGP and verified all routes are intact
+    """
+
+    tgen = get_topogen()
+    if BGP_CONVERGENCE is not True:
+        pytest.skip('skipped because of BGP Convergence failure')
+
+    # test case name
+    tc_name = request.node.name
+    write_test_header(tc_name)
+
+    # Creating configuration from JSON
+    reset_config_on_routers(tgen)
+
+    # clear ip bgp
+    result = clear_bgp_and_verify(tgen, topo, 'r1')
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    write_test_footer(tc_name)
+
+
+def test_bgp_with_loopback_interface(request):
+    """
+    Test BGP with loopback interface
+
+    Adding keys:value pair  "dest_link": "lo" and "source_link": "lo"
+    peer dict of input json file for all router's creating config using
+    loopback interface. Once BGP neighboship is up then verifying BGP
+    convergence
+    """
+
+    tgen = get_topogen()
+    if BGP_CONVERGENCE is not True:
+        pytest.skip('skipped because of BGP Convergence failure')
+
+    # test case name
+    tc_name = request.node.name
+    write_test_header(tc_name)
+
+    # Creating configuration from JSON
+    reset_config_on_routers(tgen)
+
+    for routerN in sorted(topo['routers'].keys()):
+        for bgp_neighbor in \
+                topo['routers'][routerN]['bgp']['address_family']['ipv4'][
+                'unicast']['neighbor'].keys():
+
+            # Adding ['source_link'] = 'lo' key:value pair
+            topo['routers'][routerN]['bgp']['address_family']['ipv4'][
+                'unicast']['neighbor'][bgp_neighbor]["dest_link"] = {
+                    'lo': {
+                        "source_link": "lo",
+                    }
+                }
+
+    # Creating configuration from JSON
+    build_config_from_json(tgen, topo)
+
+    input_dict = {
+        "r1": {
+            "static_routes": [{
+                    "network": "1.0.2.17/32",
+                    "next_hop": "10.0.0.2"
+                },
+                {
+                    "network": "1.0.3.17/32",
+                    "next_hop": "10.0.0.6"
+                }
+            ]
+        },
+        "r2": {
+            "static_routes": [{
+                    "network": "1.0.1.17/32",
+                    "next_hop": "10.0.0.1"
+                },
+                {
+                    "network": "1.0.3.17/32",
+                    "next_hop": "10.0.0.10"
+                }
+            ]
+        },
+        "r3": {
+            "static_routes": [{
+                    "network": "1.0.1.17/32",
+                    "next_hop": "10.0.0.5"
+                },
+                {
+                    "network": "1.0.2.17/32",
+                    "next_hop": "10.0.0.9"
+                },
+                {
+                    "network": "1.0.4.17/32",
+                    "next_hop": "10.0.0.14"
+                }
+            ]
+        },
+        "r4": {
+            "static_routes": [{
+                "network": "1.0.3.17/32",
+                "next_hop": "10.0.0.13"
+            }]
+        }
+    }
+    result = create_static_routes(tgen, input_dict)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    # Api call verify whether BGP is converged
+    result = verify_bgp_convergence(tgen, topo)
+    assert result is True, "Testcase {} :Failed \n Error: {}". \
+        format(tc_name, result)
+
+    write_test_footer(tc_name)
+
+
 if __name__ == '__main__':
     args = ["-s"] + sys.argv[1:]
     sys.exit(pytest.main(args))
index 04e4b6ebacff9071a17e28b98ad90641677f37d5..a4b65cb9872741e3ec8190ae5dd05c5b384d27d1 100644 (file)
@@ -883,14 +883,6 @@ def clear_bgp_and_verify(tgen, topo, router):
     peer_uptime_before_clear_bgp = {}
     # Verifying BGP convergence before bgp clear command
     for retry in range(1, 11):
-        show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
-                                        isjson=True)
-        logger.info(show_bgp_json)
-        # Verifying output dictionary show_bgp_json is empty or not
-        if not bool(show_bgp_json):
-            errormsg = "BGP is not running"
-            return errormsg
-
         sleeptime = 2 * retry
         if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
             # Waiting for BGP to converge
@@ -902,6 +894,14 @@ def clear_bgp_and_verify(tgen, topo, router):
                        " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
             return errormsg
 
+        show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+                                        isjson=True)
+        logger.info(show_bgp_json)
+        # Verifying output dictionary show_bgp_json is empty or not
+        if not bool(show_bgp_json):
+            errormsg = "BGP is not running"
+            return errormsg
+
         # To find neighbor ip type
         bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
         for addr_type in bgp_addr_type.keys():
@@ -964,13 +964,6 @@ def clear_bgp_and_verify(tgen, topo, router):
     peer_uptime_after_clear_bgp = {}
     # Verifying BGP convergence after bgp clear command
     for retry in range(1, 11):
-        show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
-                                        isjson=True)
-        # Verifying output dictionary show_bgp_json is empty or not
-        if not bool(show_bgp_json):
-            errormsg = "BGP is not running"
-            return errormsg
-
         sleeptime = 2 * retry
         if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
             # Waiting for BGP to converge
@@ -982,6 +975,13 @@ def clear_bgp_and_verify(tgen, topo, router):
                        " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
             return errormsg
 
+        show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+                                        isjson=True)
+        # Verifying output dictionary show_bgp_json is empty or not
+        if not bool(show_bgp_json):
+            errormsg = "BGP is not running"
+            return errormsg
+
         # To find neighbor ip type
         bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
 
index 78d8d022ad40c29e3b04e62fb5048ea8011b1832..99825dd2ad3fb42346cc1c6fafe58286c3962ba0 100644 (file)
@@ -20,6 +20,7 @@
 
 from collections import OrderedDict
 from datetime import datetime
+from time import sleep
 import StringIO
 import os
 import ConfigParser
@@ -1095,3 +1096,253 @@ def create_route_maps(tgen, input_dict, build=False):
     logger.debug("Exiting lib API: create_prefix_lists()")
     return result
 
+
+#############################################
+# Verification APIs
+#############################################
+def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+    """
+    Data will be read from input_dict or input JSON file, API will generate
+    same prefixes, which were redistributed by either create_static_routes() or
+    advertise_networks_using_network_command() and do will verify next_hop and
+    each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
+    command o/p.
+
+    Parameters
+    ----------
+    * `tgen` : topogen object
+    * `addr_type` : ip type, ipv4/ipv6
+    * `dut`: Device Under Test, for which user wants to test the data
+    * `input_dict` : input dict, has details of static routes
+    * `next_hop`[optional]: next_hop which needs to be verified,
+                           default: static
+    * `protocol`[optional]: protocol, default = None
+
+    Usage
+    -----
+    # RIB can be verified for static routes OR network advertised using
+    network command. Following are input_dicts to create static routes
+    and advertise networks using network command. Any one of the input_dict
+    can be passed to verify_rib() to verify routes in DUT"s RIB.
+
+    # Creating static routes for r1
+    input_dict = {
+        "r1": {
+            "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
+        "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
+        }}
+    # Advertising networks using network command in router r1
+    input_dict = {
+       "r1": {
+          "advertise_networks": [{"start_ip": "20.0.0.0/32",
+                                  "no_of_network": 10},
+                                  {"start_ip": "30.0.0.0/32"}]
+        }}
+    # Verifying ipv4 routes in router r1 learned via BGP
+    dut = "r2"
+    protocol = "bgp"
+    result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
+    Returns
+    -------
+    errormsg(str) or True
+    """
+
+    logger.info("Entering lib API: verify_rib()")
+
+    router_list = tgen.routers()
+    for routerInput in input_dict.keys():
+        for router, rnode in router_list.iteritems():
+            if router != dut:
+                continue
+
+            # Verifying RIB routes
+            if addr_type == "ipv4":
+                if protocol:
+                    command = "show ip route {} json".format(protocol)
+                else:
+                    command = "show ip route json"
+            else:
+                if protocol:
+                    command = "show ipv6 route {} json".format(protocol)
+                else:
+                    command = "show ipv6 route json"
+
+            sleep(2)
+            logger.info("Checking router %s RIB:", router)
+            rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+            # Verifying output dictionary rib_routes_json is not empty
+            if bool(rib_routes_json) is False:
+                errormsg = "No {} route found in rib of router {}..". \
+                    format(protocol, router)
+                return errormsg
+
+            if "static_routes" in input_dict[routerInput]:
+                static_routes = input_dict[routerInput]["static_routes"]
+                st_found = False
+                nh_found = False
+                found_routes = []
+                missing_routes = []
+                for static_route in static_routes:
+                    network = static_route["network"]
+                    if "no_of_ip" in static_route:
+                        no_of_ip = static_route["no_of_ip"]
+                    else:
+                        no_of_ip = 0
+
+                    # Generating IPs for verification
+                    ip_list = generate_ips(network, no_of_ip)
+                    for st_rt in ip_list:
+                        st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+                        if st_rt in rib_routes_json:
+                            st_found = True
+                            found_routes.append(st_rt)
+
+                            if next_hop:
+                                if type(next_hop) is not list:
+                                    next_hop = [next_hop]
+
+                                found_hops = [rib_r["ip"] for rib_r in
+                                              rib_routes_json[st_rt][0][
+                                                  "nexthops"]]
+                                for nh in next_hop:
+                                    nh_found = False
+                                    if nh and nh in found_hops:
+                                        nh_found = True
+                                    else:
+                                        errormsg = ("Nexthop {} is Missing for {}"
+                                                    " route {} in RIB of router"
+                                                    " {}\n".format(next_hop,
+                                                                   protocol,
+                                                                   st_rt, dut))
+
+                                        return errormsg
+                        else:
+                            missing_routes.append(st_rt)
+                if nh_found:
+                    logger.info("Found next_hop %s for all routes in RIB of"
+                                " router %s\n", next_hop, dut)
+
+                if not st_found and len(missing_routes) > 0:
+                    errormsg = "Missing route in RIB of router {}, routes: " \
+                               "{}\n".format(dut, missing_routes)
+                    return errormsg
+
+                logger.info("Verified routes in router %s RIB, found routes"
+                            " are: %s\n", dut, found_routes)
+
+            advertise_network = input_dict[routerInput].setdefault(
+                "advertise_networks", {})
+            if advertise_network:
+                found_routes = []
+                missing_routes = []
+                found = False
+                for advertise_network_dict in advertise_network:
+                    start_ip = advertise_network_dict["network"]
+                    if "no_of_network" in advertise_network_dict:
+                        no_of_network = advertise_network_dict["no_of_network"]
+                    else:
+                        no_of_network = 0
+
+                    # Generating IPs for verification
+                    ip_list = generate_ips(start_ip, no_of_network)
+                    for st_rt in ip_list:
+                        st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+                        if st_rt in rib_routes_json:
+                            found = True
+                            found_routes.append(st_rt)
+                        else:
+                            missing_routes.append(st_rt)
+
+                if not found and len(missing_routes) > 0:
+                    errormsg = "Missing route in RIB of router {}, are: {}" \
+                               " \n".format(dut, missing_routes)
+                    return errormsg
+
+                logger.info("Verified routes in router %s RIB, found routes"
+                            " are: %s", dut, found_routes)
+
+    logger.info("Exiting lib API: verify_rib()")
+    return True
+
+
+def verify_admin_distance_for_static_routes(tgen, input_dict):
+    """
+    API to verify admin distance for static routes as defined in input_dict/
+    input JSON by running show ip/ipv6 route json command.
+
+    Parameter
+    ---------
+    * `tgen` : topogen object
+    * `input_dict`: having details like - for which router and static routes
+                    admin dsitance needs to be verified
+    Usage
+    -----
+    # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
+    10.0.0.2 in router r1
+    input_dict = {
+        "r1": {
+            "static_routes": [{
+                "network": "10.0.20.1/32",
+                "admin_distance": 10,
+                "next_hop": "10.0.0.2"
+            }]
+        }
+    }
+    result = verify_admin_distance_for_static_routes(tgen, input_dict)
+
+    Returns
+    -------
+    errormsg(str) or True
+    """
+
+    logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
+
+    for router in input_dict.keys():
+        if router not in tgen.routers():
+            continue
+
+        rnode = tgen.routers()[router]
+
+        for static_route in input_dict[router]["static_routes"]:
+            addr_type = validate_ip_address(static_route["network"])
+            # Command to execute
+            if addr_type == "ipv4":
+                command = "show ip route json"
+            else:
+                command = "show ipv6 route json"
+            show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
+
+            logger.info("Verifying admin distance for static route %s"
+                        " under dut %s:", static_route, router)
+            network = static_route["network"]
+            next_hop = static_route["next_hop"]
+            admin_distance = static_route["admin_distance"]
+            route_data = show_ip_route_json[network][0]
+            if network in show_ip_route_json:
+                if route_data["nexthops"][0]["ip"] == next_hop:
+                    if route_data["distance"] != admin_distance:
+                        errormsg = ("Verification failed: admin distance"
+                                    " for static route {} under dut {},"
+                                    " found:{} but expected:{}".
+                                    format(static_route, router,
+                                           route_data["distance"],
+                                           admin_distance))
+                        return errormsg
+                    else:
+                        logger.info("Verification successful: admin"
+                                    " distance for static route %s under"
+                                    " dut %s, found:%s", static_route,
+                                    router, route_data["distance"])
+
+            else:
+                errormsg = ("Static route {} not found in "
+                            "show_ip_route_json for dut {}".
+                            format(network, router))
+                return errormsg
+
+    logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
+    return True