import os
import sys
+import ipaddr
import traceback
import socket
import ipaddress
which basically runs defined CLIs and dumps the data to specified location
"""
+ """
tgen = get_topogen()
router_list = tgen.routers()
test_name = sys._getframe(2).f_code.co_name
rnode.run("mkdir -p {}".format(dst_bundle))
rnode.run("mv -f {}/* {}".format(src_bundle, dst_bundle))
+ """
+ pass
+
return True
errormsg(str) or True
"""
- logger.info("Entering lib API: verify_rib()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
" routes are: {}\n".format(addr_type, dut, found_routes)
)
- logger.info("Exiting lib API: verify_rib()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 fib json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+
+ Usage
+ -----
+ input_routes_r1 = {
+ "r1": {
+ "static_routes": [{
+ "network": ["1.1.1.1/32],
+ "next_hop": "Null0",
+ "vrf": "RED"
+ }]
+ }
+ }
+ result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ logger.info("Checking router %s FIB routes:", router)
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ command = "show ip fib"
+ else:
+ command = "show ipv6 fib"
+
+ found_routes = []
+ missing_routes = []
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ if "vrf" in static_route and static_route["vrf"] is not None:
+
+ logger.info(
+ "[DUT: {}]: Verifying routes for VRF:"
+ " {}".format(router, static_route["vrf"])
+ )
+
+ cmd = "{} vrf {}".format(command, static_route["vrf"])
+
+ else:
+ cmd = "{}".format(command)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "[DUT: {}]: No route found in fib".format(router)
+ return errormsg
+
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0][
+ "nexthops"
+ ]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+
+ if count == len(next_hop):
+ nh_found = True
+ else:
+ missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(
+ next_hop, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
+
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
+
+ continue
+
+ if "bgp" in input_dict[routerInput]:
+ if (
+ "advertise_networks"
+ not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]
+ ):
+ continue
+
+ found_routes = []
+ missing_routes = []
+ advertise_network = input_dict[routerInput]["bgp"]["address_family"][
+ addr_type
+ ]["unicast"]["advertise_networks"]
+
+ # Continue if there are no network advertise
+ if len(advertise_network) == 0:
+ continue
+
+ for advertise_network_dict in advertise_network:
+ if "vrf" in advertise_network_dict:
+ cmd = "{} vrf {} json".format(command, static_route["vrf"])
+ else:
+ cmd = "{} json".format(command)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+
+ if count == len(next_hop):
+ nh_found = True
+ else:
+ missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(next_hop, st_rt, dut)
+ )
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
+
+ if found_routes:
+ logger.info(
+ "[DUT: {}]: Verified routes FIB"
+ ", found routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True