]> git.puffer.fish Git - matthieu/frr.git/commitdiff
tests: Add f/w support for bgp_vrf_dynamic_route_leak tests automation
authorKuldeep Kashyap <kashyapk@vmware.com>
Mon, 17 Aug 2020 03:57:55 +0000 (03:57 +0000)
committerKuldeep Kashyap <kashyapk@vmware.com>
Tue, 1 Sep 2020 15:48:10 +0000 (15:48 +0000)
1. Adding f/w support for bgp_vrf_dynamic_route_leak tests automation

Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
tests/topotests/lib/bgp.py
tests/topotests/lib/common_config.py

index 316d4cf414b3c5932fe66bfd99da5cb55d39edb2..e2f887bb89021d2228d37d9daf91023f13d71c4d 100644 (file)
@@ -456,6 +456,15 @@ def __create_bgp_unicast_neighbor(
                         cmd = "no {}".format(cmd)
                     config_data.append(cmd)
 
+        import_vrf_data = addr_data.setdefault("import", {})
+        if import_vrf_data:
+            cmd = "import vrf {}".format(import_vrf_data["vrf"])
+
+            del_action = import_vrf_data.setdefault("delete", False)
+            if del_action:
+                cmd = "no {}".format(cmd)
+            config_data.append(cmd)
+
         if "neighbor" in addr_data:
             neigh_data = __create_bgp_neighbor(
                 topo, input_dict, router, addr_type, add_neigh
@@ -2610,6 +2619,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
                                 if not isinstance(next_hop, list):
                                     next_hop = [next_hop]
                                     list1 = next_hop
+
                                 found_hops = [
                                     rib_r["ip"]
                                     for rib_r in rib_routes_json["routes"][st_rt][0][
@@ -2617,6 +2627,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
                                     ]
                                 ]
                                 list2 = found_hops
+
                                 missing_list_of_nexthops = set(list2).difference(list1)
                                 additional_nexthops_in_required_nhs = set(
                                     list1
index d77946b1ed6f33033fa9199e7003b39683d3aba1..3a41a7c54f090ae6a7e7d25e79006d346ba9f6da 100644 (file)
@@ -32,6 +32,7 @@ from tempfile import mkdtemp
 
 import os
 import sys
+import ipaddr
 import traceback
 import socket
 import ipaddress
@@ -661,6 +662,7 @@ def generate_support_bundle():
     which basically runs defined CLIs and dumps the data to specified location
     """
 
+    """
     tgen = get_topogen()
     router_list = tgen.routers()
     test_name = sys._getframe(2).f_code.co_name
@@ -678,6 +680,9 @@ def generate_support_bundle():
         rnode.run("mkdir -p {}".format(dst_bundle))
         rnode.run("mv -f {}/* {}".format(src_bundle, dst_bundle))
 
+    """
+    pass
+
     return True
 
 
@@ -2517,7 +2522,7 @@ def verify_rib(
     errormsg(str) or True
     """
 
-    logger.info("Entering lib API: verify_rib()")
+    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
 
     router_list = tgen.routers()
     additional_nexthops_in_required_nhs = []
@@ -2832,7 +2837,261 @@ def verify_rib(
                         " routes  are: {}\n".format(addr_type, dut, found_routes)
                     )
 
-    logger.info("Exiting lib API: verify_rib()")
+    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+    return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
+    """
+    Data will be read from input_dict or input JSON file, API will generate
+    same prefixes, which were redistributed by either create_static_routes() or
+    advertise_networks_using_network_command() and will verify next_hop and
+    each prefix/routes is present in "show ip/ipv6 fib json"
+    command o/p.
+
+    Parameters
+    ----------
+    * `tgen` : topogen object
+    * `addr_type` : ip type, ipv4/ipv6
+    * `dut`: Device Under Test, for which user wants to test the data
+    * `input_dict` : input dict, has details of static routes
+    * `next_hop`[optional]: next_hop which needs to be verified,
+                           default: static
+
+    Usage
+    -----
+    input_routes_r1 = {
+        "r1": {
+            "static_routes": [{
+                "network": ["1.1.1.1/32],
+                "next_hop": "Null0",
+                "vrf": "RED"
+            }]
+        }
+    }
+    result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
+
+    Returns
+    -------
+    errormsg(str) or True
+    """
+
+    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+    router_list = tgen.routers()
+    for routerInput in input_dict.keys():
+        for router, rnode in router_list.iteritems():
+            if router != dut:
+                continue
+
+            logger.info("Checking router %s FIB routes:", router)
+
+            # Verifying RIB routes
+            if addr_type == "ipv4":
+                command = "show ip fib"
+            else:
+                command = "show ipv6 fib"
+
+            found_routes = []
+            missing_routes = []
+
+            if "static_routes" in input_dict[routerInput]:
+                static_routes = input_dict[routerInput]["static_routes"]
+
+                for static_route in static_routes:
+                    if "vrf" in static_route and static_route["vrf"] is not None:
+
+                        logger.info(
+                            "[DUT: {}]: Verifying routes for VRF:"
+                            " {}".format(router, static_route["vrf"])
+                        )
+
+                        cmd = "{} vrf {}".format(command, static_route["vrf"])
+
+                    else:
+                        cmd = "{}".format(command)
+
+                    cmd = "{} json".format(cmd)
+
+                    rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+                    # Verifying output dictionary rib_routes_json is not empty
+                    if bool(rib_routes_json) is False:
+                        errormsg = "[DUT: {}]: No route found in fib".format(router)
+                        return errormsg
+
+                    network = static_route["network"]
+                    if "no_of_ip" in static_route:
+                        no_of_ip = static_route["no_of_ip"]
+                    else:
+                        no_of_ip = 1
+
+                    # Generating IPs for verification
+                    ip_list = generate_ips(network, no_of_ip)
+                    st_found = False
+                    nh_found = False
+
+                    for st_rt in ip_list:
+                        st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+                        _addr_type = validate_ip_address(st_rt)
+                        if _addr_type != addr_type:
+                            continue
+
+                        if st_rt in rib_routes_json:
+                            st_found = True
+                            found_routes.append(st_rt)
+
+                            if next_hop:
+                                if type(next_hop) is not list:
+                                    next_hop = [next_hop]
+
+                                count = 0
+                                for nh in next_hop:
+                                    for nh_dict in rib_routes_json[st_rt][0][
+                                        "nexthops"
+                                    ]:
+                                        if nh_dict["ip"] != nh:
+                                            continue
+                                        else:
+                                            count += 1
+
+                                if count == len(next_hop):
+                                    nh_found = True
+                                else:
+                                    missing_routes.append(st_rt)
+                                    errormsg = (
+                                        "Nexthop {} is Missing"
+                                        " for route {} in "
+                                        "RIB of router {}\n".format(
+                                            next_hop, st_rt, dut
+                                        )
+                                    )
+                                    return errormsg
+
+                        else:
+                            missing_routes.append(st_rt)
+
+                if len(missing_routes) > 0:
+                    errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
+                        dut, missing_routes
+                    )
+                    return errormsg
+
+                if nh_found:
+                    logger.info(
+                        "Found next_hop {} for all routes in RIB"
+                        " of router {}\n".format(next_hop, dut)
+                    )
+
+                if found_routes:
+                    logger.info(
+                        "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
+                        dut,
+                        found_routes,
+                    )
+
+                continue
+
+            if "bgp" in input_dict[routerInput]:
+                if (
+                    "advertise_networks"
+                    not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+                        "unicast"
+                    ]
+                ):
+                    continue
+
+                found_routes = []
+                missing_routes = []
+                advertise_network = input_dict[routerInput]["bgp"]["address_family"][
+                    addr_type
+                ]["unicast"]["advertise_networks"]
+
+                # Continue if there are no network advertise
+                if len(advertise_network) == 0:
+                    continue
+
+                for advertise_network_dict in advertise_network:
+                    if "vrf" in advertise_network_dict:
+                        cmd = "{} vrf {} json".format(command, static_route["vrf"])
+                    else:
+                        cmd = "{} json".format(command)
+
+                rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+                # Verifying output dictionary rib_routes_json is not empty
+                if bool(rib_routes_json) is False:
+                    errormsg = "No route found in rib of router {}..".format(router)
+                    return errormsg
+
+                start_ip = advertise_network_dict["network"]
+                if "no_of_network" in advertise_network_dict:
+                    no_of_network = advertise_network_dict["no_of_network"]
+                else:
+                    no_of_network = 1
+
+                # Generating IPs for verification
+                ip_list = generate_ips(start_ip, no_of_network)
+                st_found = False
+                nh_found = False
+
+                for st_rt in ip_list:
+                    st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+                    _addr_type = validate_ip_address(st_rt)
+                    if _addr_type != addr_type:
+                        continue
+
+                    if st_rt in rib_routes_json:
+                        st_found = True
+                        found_routes.append(st_rt)
+
+                        if next_hop:
+                            if type(next_hop) is not list:
+                                next_hop = [next_hop]
+
+                            count = 0
+                            for nh in next_hop:
+                                for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
+                                    if nh_dict["ip"] != nh:
+                                        continue
+                                    else:
+                                        count += 1
+
+                            if count == len(next_hop):
+                                nh_found = True
+                            else:
+                                missing_routes.append(st_rt)
+                                errormsg = (
+                                    "Nexthop {} is Missing"
+                                    " for route {} in "
+                                    "RIB of router {}\n".format(next_hop, st_rt, dut)
+                                )
+                                return errormsg
+                    else:
+                        missing_routes.append(st_rt)
+
+                if len(missing_routes) > 0:
+                    errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
+                        dut, missing_routes
+                    )
+                    return errormsg
+
+                if nh_found:
+                    logger.info(
+                        "Found next_hop {} for all routes in RIB"
+                        " of router {}\n".format(next_hop, dut)
+                    )
+
+                if found_routes:
+                    logger.info(
+                        "[DUT: {}]: Verified routes FIB"
+                        ", found routes  are: {}\n".format(dut, found_routes)
+                    )
+
+    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
     return True