diff options
Diffstat (limited to 'tests/topotests/lib/common_config.py')
| -rw-r--r-- | tests/topotests/lib/common_config.py | 844 |
1 files changed, 702 insertions, 142 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 0b19877aff..3de7ab3ebe 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -175,6 +175,49 @@ def run_frr_cmd(rnode, cmd, isjson=False): raise InvalidCLIError("No actual cmd passed") +def apply_raw_config(tgen, input_dict): + + """ + API to configure raw configuration on device. This can be used for any cli + which does has not been implemented in JSON. + + Parameters + ---------- + * `tgen`: tgen onject + * `input_dict`: configuration that needs to be applied + + Usage + ----- + input_dict = { + "r2": { + "raw_config": [ + "router bgp", + "no bgp update-group-split-horizon" + ] + } + } + Returns + ------- + True or errormsg + """ + + result = True + for router_name in input_dict.keys(): + config_cmd = input_dict[router_name]["raw_config"] + + if not isinstance(config_cmd, list): + config_cmd = [config_cmd] + + frr_cfg_file = "{}/{}/{}".format(TMPDIR, router_name, FRRCFG_FILE) + with open(frr_cfg_file, "w") as cfg: + for cmd in config_cmd: + cfg.write("{}\n".format(cmd)) + + result = load_config_to_router(tgen, router_name) + + return result + + def create_common_configuration( tgen, router, data, config_type=None, build=False, load_config=True ): @@ -207,6 +250,7 @@ def create_common_configuration( "bgp_community_list": "! Community List Config\n", "route_maps": "! Route Maps Config\n", "bgp": "! BGP Config\n", + "vrf": "! VRF Config\n", } ) @@ -355,6 +399,7 @@ def reset_config_on_routers(tgen, routerName=None): """ Resets configuration on routers to the snapshot created using input JSON file. It replaces existing router configuration with FRRCFG_BKUP_FILE + Parameters ---------- * `tgen` : Topogen object @@ -370,6 +415,7 @@ def reset_config_on_routers(tgen, routerName=None): router = router_list[rname] logger.info("Configuring router %s to initial test configuration", rname) + cfg = router.run("vtysh -c 'show running'") fname = "{}/{}/frr.sav".format(TMPDIR, rname) dname = "{}/{}/delta.conf".format(TMPDIR, rname) @@ -387,22 +433,13 @@ def reset_config_on_routers(tgen, routerName=None): f.write("\n") f.close() - run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname) init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname) - - tempdir = mkdtemp() - with open(os.path.join(tempdir, "vtysh.conf"), "w") as fd: - pass - - command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}".format( - tempdir, run_cfg_file, init_cfg_file, dname + command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format( + run_cfg_file, init_cfg_file, dname ) result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE) - os.unlink(os.path.join(tempdir, "vtysh.conf")) - os.rmdir(tempdir) - # Assert if command fail if result > 0: logger.error("Delta file creation failed. Command executed %s", command) @@ -459,17 +496,18 @@ def reset_config_on_routers(tgen, routerName=None): # Router current configuration to log file or console if # "show_router_config" is defined in "pytest.ini" if show_router_config: - logger.info("Configuration on router {} after config reset:".format(rname)) + logger.info("Configuration on router {} after reset:".format(rname)) logger.info(delta.getvalue()) delta.close() - logger.debug("Exting API: reset_config_on_routers") + logger.debug("Exiting API: reset_config_on_routers") return True def load_config_to_router(tgen, routerName, save_bkup=False): """ Loads configuration on router from the file FRRCFG_FILE. + Parameters ---------- * `tgen` : Topogen object @@ -481,7 +519,7 @@ def load_config_to_router(tgen, routerName, save_bkup=False): router_list = tgen.routers() for rname in ROUTER_LIST: - if routerName and routerName != rname: + if routerName and rname != routerName: continue router = router_list[rname] @@ -504,6 +542,7 @@ def load_config_to_router(tgen, routerName, save_bkup=False): raise InvalidCLIError("%s" % output) cfg.truncate(0) + except IOError as err: errormsg = ( "Unable to open config File. error(%s):" " %s", @@ -514,24 +553,29 @@ def load_config_to_router(tgen, routerName, save_bkup=False): # Router current configuration to log file or console if # "show_router_config" is defined in "pytest.ini" if show_router_config: + logger.info("New configuration for router {}:".format(rname)) new_config = router.run("vtysh -c 'show running'") logger.info(new_config) - logger.debug("Exting API: load_config_to_router") + logger.debug("Exiting API: load_config_to_router") return True -def get_frr_ipv6_linklocal(tgen, router, intf=None): +def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None): """ API to get the link local ipv6 address of a perticular interface using FRR command 'show interface' + * `tgen`: tgen onject * `router` : router for which hightest interface should be calculated * `intf` : interface for which linklocal address needs to be taken + * `vrf` : VRF name + Usage ----- linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A) + Returns ------- 1) array of interface names to link local ips. @@ -544,7 +588,10 @@ def get_frr_ipv6_linklocal(tgen, router, intf=None): linklocal = [] - cmd = "show interface" + if vrf: + cmd = "show interface vrf {}".format(vrf) + else: + cmd = "show interface" ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd)) @@ -635,6 +682,55 @@ def start_topology(tgen): tgen.start_router() +def stop_router(tgen, router): + """ + Router"s current config would be saved to /etc/frr/ for each deamon + and router and its deamons would be stopped. + + * `tgen` : topogen object + * `router`: Device under test + """ + + router_list = tgen.routers() + + # Saving router config to /etc/frr, which will be loaded to router + # when it starts + router_list[router].vtysh_cmd("write memory") + + # Stop router + router_list[router].stop() + + +def start_router(tgen, router): + """ + Router will started and config would be loaded from /etc/frr/ for each + deamon + + * `tgen` : topogen object + * `router`: Device under test + """ + + logger.debug("Entering lib API: start_router") + + try: + router_list = tgen.routers() + + # Router and its deamons would be started and config would + # be loaded to router for each deamon from /etc/frr + router_list[router].start() + + # Waiting for router to come up + sleep(5) + + except Exception as e: + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: start_router()") + return True + + def number_to_row(routerName): """ Returns the number for the router. @@ -658,6 +754,190 @@ def number_to_column(routerName): ############################################# +def create_vrf_cfg(tgen, topo, input_dict=None, build=False): + """ + Create vrf configuration for created topology. VRF + configuration is provided in input json file. + + VRF config is done in Linux Kernel: + * Create VRF + * Attach interface to VRF + * Bring up VRF + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring + from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + input_dict={ + "r3": { + "links": { + "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"}, + "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"}, + "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"}, + "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"}, + }, + "vrfs":[ + { + "name": "RED_A", + "id": "1" + }, + { + "name": "RED_B", + "id": "2" + }, + { + "name": "BLUE_A", + "id": "3", + "delete": True + }, + { + "name": "BLUE_B", + "id": "4" + } + ] + } + } + result = create_vrf_cfg(tgen, topo, input_dict) + + Returns + ------- + True or False + """ + result = True + if not input_dict: + input_dict = deepcopy(topo) + else: + input_dict = deepcopy(input_dict) + + try: + for c_router, c_data in input_dict.iteritems(): + rnode = tgen.routers()[c_router] + if "vrfs" in c_data: + for vrf in c_data["vrfs"]: + config_data = [] + del_action = vrf.setdefault("delete", False) + name = vrf.setdefault("name", None) + table_id = vrf.setdefault("id", None) + vni = vrf.setdefault("vni", None) + del_vni = vrf.setdefault("no_vni", None) + + if del_action: + # Kernel cmd- Add VRF and table + cmd = "ip link del {} type vrf table {}".format( + vrf["name"], vrf["id"] + ) + + logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd) + rnode.run(cmd) + + # Kernel cmd - Bring down VRF + cmd = "ip link set dev {} down".format(name) + logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd) + rnode.run(cmd) + + else: + if name and table_id: + # Kernel cmd- Add VRF and table + cmd = "ip link add {} type vrf table {}".format( + name, table_id + ) + logger.info( + "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd + ) + rnode.run(cmd) + + # Kernel cmd - Bring up VRF + cmd = "ip link set dev {} up".format(name) + logger.info( + "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd + ) + rnode.run(cmd) + + if "links" in c_data: + for destRouterLink, data in sorted( + c_data["links"].iteritems() + ): + # Loopback interfaces + if "type" in data and data["type"] == "loopback": + interface_name = destRouterLink + else: + interface_name = data["interface"] + + if "vrf" in data: + vrf_list = data["vrf"] + + if type(vrf_list) is not list: + vrf_list = [vrf_list] + + for _vrf in vrf_list: + cmd = "ip link set {} master {}".format( + interface_name, _vrf + ) + + logger.info( + "[DUT: %s]: Running" " kernel cmd [%s]", + c_router, + cmd, + ) + rnode.run(cmd) + + result = create_common_configuration( + tgen, c_router, config_data, "vrf", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + return result + + +def create_interface_in_kernel( + tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True +): + """ + Cretae interfaces in kernel for ipv4/ipv6 + Config is done in Linux Kernel: + + Parameters + ---------- + * `tgen` : Topogen object + * `dut` : Device for which interfaces to be added + * `name` : interface name + * `ip_addr` : ip address for interface + * `vrf` : VRF name, to which interface will be associated + * `netmask` : netmask value, default is None + * `create`: Create interface in kernel, if created then no need + to create + """ + + rnode = tgen.routers()[dut] + + if create: + cmd = "sudo ip link add name {} type dummy".format(name) + rnode.run(cmd) + + addr_type = validate_ip_address(ip_addr) + if addr_type == "ipv4": + cmd = "ifconfig {} {} netmask {}".format(name, ip_addr, netmask) + else: + cmd = "ifconfig {} inet6 add {}/{}".format(name, ip_addr, netmask) + + rnode.run(cmd) + + if vrf: + cmd = "ip link set {} master {}".format(name, vrf) + rnode.run(cmd) + + def validate_ip_address(ip_address): """ Validates the type of ip address @@ -860,13 +1140,15 @@ def interface_status(tgen, topo, input_dict): return True -def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0): +def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False): """ Retries function execution, if return is an errormsg or exception + * `attempts`: Number of attempts to make * `wait`: Number of seconds to wait between each attempt * `return_is_str`: Return val is an errormsg in case of failure * `initial_wait`: Sleeps for this much seconds before executing function + """ def _retry(func): @@ -883,15 +1165,20 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0): sleep(initial_wait) _return_is_str = kwargs.pop("return_is_str", return_is_str) + _return_is_dict = kwargs.pop("return_is_str", return_is_dict) for i in range(1, _attempts + 1): try: _expected = kwargs.setdefault("expected", True) kwargs.pop("expected") ret = func(*args, **kwargs) logger.debug("Function returned %s" % ret) - if return_is_str and isinstance(ret, bool) and _expected: + if _return_is_str and isinstance(ret, bool) and _expected: return ret - if isinstance(ret, str) and _expected is False: + if ( + isinstance(ret, str) or isinstance(ret, unicode) + ) and _expected is False: + return ret + if _return_is_dict and isinstance(ret, dict): return ret if _attempts == i: @@ -945,16 +1232,19 @@ def create_interfaces_cfg(tgen, topo, build=False): """ Create interface configuration for created topology. Basic Interface configuration is provided in input json file. + Parameters ---------- * `tgen` : Topogen object * `topo` : json file data * `build` : Only for initial setup phase this is set as True. + Returns ------- True or False """ result = False + topo = deepcopy(topo) try: for c_router, c_data in topo.iteritems(): @@ -965,13 +1255,30 @@ def create_interfaces_cfg(tgen, topo, build=False): interface_name = destRouterLink else: interface_name = data["interface"] + interface_data.append("interface {}".format(str(interface_name))) if "ipv4" in data: intf_addr = c_data["links"][destRouterLink]["ipv4"] - interface_data.append("ip address {}".format(intf_addr)) + + if "delete" in data and data["delete"]: + interface_data.append("no ip address {}".format(intf_addr)) + else: + interface_data.append("ip address {}".format(intf_addr)) if "ipv6" in data: intf_addr = c_data["links"][destRouterLink]["ipv6"] - interface_data.append("ipv6 address {}".format(intf_addr)) + + if "delete" in data and data["delete"]: + interface_data.append("no ipv6 address {}".format(intf_addr)) + else: + interface_data.append("ipv6 address {}".format(intf_addr)) + + if "ipv6-link-local" in data: + intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"] + + if "delete" in data and data["delete"]: + interface_data.append("no ipv6 address {}".format(intf_addr)) + else: + interface_data.append("ipv6 address {}\n".format(intf_addr)) result = create_common_configuration( tgen, c_router, interface_data, "interface_config", build=build @@ -988,11 +1295,13 @@ def create_interfaces_cfg(tgen, topo, build=False): def create_static_routes(tgen, input_dict, build=False): """ Create static routes for given router as defined in input_dict + Parameters ---------- * `tgen` : Topogen object * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. + Usage ----- input_dict should be in the format below: @@ -1002,7 +1311,9 @@ def create_static_routes(tgen, input_dict, build=False): # admin_distance: admin distance for route/routes. # next_hop: starting next-hop address # tag: tag id for static routes + # vrf: VRF name in which static routes needs to be created # delete: True if config to be removed. Default False. + Example: "routers": { "r1": { @@ -1012,24 +1323,27 @@ def create_static_routes(tgen, input_dict, build=False): "no_of_ip": 9, "admin_distance": 100, "next_hop": "10.0.0.1", - "tag": 4001 + "tag": 4001, + "vrf": "RED_A" "delete": true } ] } } + Returns ------- errormsg(str) or True """ result = False - logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + logger.debug("Entering lib API: create_static_routes()") input_dict = deepcopy(input_dict) + try: for router in input_dict.keys(): if "static_routes" not in input_dict[router]: errormsg = "static_routes not present in input_dict" - logger.debug(errormsg) + logger.info(errormsg) continue static_routes_list = [] @@ -1037,27 +1351,38 @@ def create_static_routes(tgen, input_dict, build=False): static_routes = input_dict[router]["static_routes"] for static_route in static_routes: del_action = static_route.setdefault("delete", False) - # No of IPs no_of_ip = static_route.setdefault("no_of_ip", 1) - admin_distance = static_route.setdefault("admin_distance", None) - tag = static_route.setdefault("tag", None) - if "next_hop" not in static_route or "network" not in static_route: - errormsg = "'next_hop' or 'network' missing in" " input_dict" - return errormsg - - next_hop = static_route["next_hop"] - network = static_route["network"] + network = static_route.setdefault("network", []) if type(network) is not list: network = [network] + admin_distance = static_route.setdefault("admin_distance", None) + tag = static_route.setdefault("tag", None) + vrf = static_route.setdefault("vrf", None) + interface = static_route.setdefault("interface", None) + next_hop = static_route.setdefault("next_hop", None) + nexthop_vrf = static_route.setdefault("nexthop_vrf", None) + ip_list = generate_ips(network, no_of_ip) for ip in ip_list: addr_type = validate_ip_address(ip) if addr_type == "ipv4": - cmd = "ip route {} {}".format(ip, next_hop) + cmd = "ip route {}".format(ip) else: - cmd = "ipv6 route {} {}".format(ip, next_hop) + cmd = "ipv6 route {}".format(ip) + + if interface: + cmd = "{} {}".format(cmd, interface) + + if next_hop: + cmd = "{} {}".format(cmd, next_hop) + + if nexthop_vrf: + cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf) + + if vrf: + cmd = "{} vrf {}".format(cmd, vrf) if tag: cmd = "{} tag {}".format(cmd, str(tag)) @@ -1080,7 +1405,7 @@ def create_static_routes(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + logger.debug("Exiting lib API: create_static_routes()") return result @@ -1433,6 +1758,9 @@ def create_route_maps(tgen, input_dict, build=False): "large_community_list", {} ) + metric = match_data.setdefault("metric", None) + source_vrf = match_data.setdefault("source-vrf", None) + if ipv4_data: # fetch prefix list data from rmap prefix_name = ipv4_data.setdefault("prefix_lists", None) @@ -1525,6 +1853,14 @@ def create_route_maps(tgen, input_dict, build=False): cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) + if source_vrf: + cmd = "match source-vrf {}".format(source_vrf) + rmap_data.append(cmd) + + if metric: + cmd = "match metric {}".format(metric) + rmap_data.append(cmd) + result = create_common_configuration( tgen, router, rmap_data, "route_maps", build=build ) @@ -1689,17 +2025,97 @@ def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False): interface_set_status(router_list[dut], intf_name, ifaceaction) +def addKernelRoute( + tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None +): + """ + Add route to kernel + + Parameters: + ----------- + * `tgen` : Topogen object + * `router`: router for which kernal routes needs to be added + * `intf`: interface name, for which kernal routes needs to be added + * `bindToAddress`: bind to <host>, an interface or multicast + address + + returns: + -------- + errormsg or True + """ + + logger.debug("Entering lib API: addKernelRoute()") + + rnode = tgen.routers()[router] + + if type(group_addr_range) is not list: + group_addr_range = [group_addr_range] + + for grp_addr in group_addr_range: + + addr_type = validate_ip_address(grp_addr) + if addr_type == "ipv4": + if next_hop is not None: + cmd = "ip route add {} via {}".format(grp_addr, next_hop) + else: + cmd = "ip route add {} dev {}".format(grp_addr, intf) + if del_action: + cmd = "ip route del {}".format(grp_addr) + verify_cmd = "ip route" + elif addr_type == "ipv6": + if intf and src: + cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src) + else: + cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop) + verify_cmd = "ip -6 route" + if del_action: + cmd = "ip -6 route del {}".format(grp_addr) + + logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd)) + output = rnode.run(cmd) + + # Verifying if ip route added to kernal + result = rnode.run(verify_cmd) + logger.debug("{}\n{}".format(verify_cmd, result)) + if "/" in grp_addr: + ip, mask = grp_addr.split("/") + if mask == "32" or mask == "128": + grp_addr = ip + + if not re_search(r"{}".format(grp_addr), result) and mask is not "0": + errormsg = ( + "[DUT: {}]: Kernal route is not added for group" + " address {} Config output: {}".format(router, grp_addr, output) + ) + + return errormsg + + logger.debug("Exiting lib API: addKernelRoute()") + return True + + ############################################# # Verification APIs ############################################# -@retry(attempts=10, return_is_str=True, initial_wait=2) -def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=None, + protocol=None, + tag=None, + metric=None, + fib=None, +): """ Data will be read from input_dict or input JSON file, API will generate same prefixes, which were redistributed by either create_static_routes() or advertise_networks_using_network_command() and do will verify next_hop and each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" command o/p. + Parameters ---------- * `tgen` : topogen object @@ -1709,12 +2125,14 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): * `next_hop`[optional]: next_hop which needs to be verified, default: static * `protocol`[optional]: protocol, default = None + Usage ----- # RIB can be verified for static routes OR network advertised using network command. Following are input_dicts to create static routes and advertise networks using network command. Any one of the input_dict can be passed to verify_rib() to verify routes in DUT"s RIB. + # Creating static routes for r1 input_dict = { "r1": { @@ -1732,186 +2150,328 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): dut = "r2" protocol = "bgp" result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) + Returns ------- errormsg(str) or True """ - logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + logger.info("Entering lib API: verify_rib()") router_list = tgen.routers() + additional_nexthops_in_required_nhs = [] + found_hops = [] for routerInput in input_dict.keys(): for router, rnode in router_list.iteritems(): if router != dut: continue + logger.info("Checking router %s RIB:", router) + # Verifying RIB routes if addr_type == "ipv4": - if protocol: - command = "show ip route {} json".format(protocol) - else: - command = "show ip route json" + command = "show ip route" else: - if protocol: - command = "show ipv6 route {} json".format(protocol) - else: - command = "show ipv6 route json" - - logger.info("Checking router %s RIB:", router) - rib_routes_json = run_frr_cmd(rnode, command, isjson=True) + command = "show ipv6 route" - # Verifying output dictionary rib_routes_json is not empty - if bool(rib_routes_json) is False: - errormsg = "No {} route found in rib of router {}..".format( - protocol, router - ) - return errormsg + found_routes = [] + missing_routes = [] if "static_routes" in input_dict[routerInput]: static_routes = input_dict[routerInput]["static_routes"] - st_found = False - nh_found = False - found_routes = [] - missing_routes = [] + for static_route in static_routes: + if "vrf" in static_route and static_route["vrf"] is not None: + + logger.info( + "[DUT: {}]: Verifying routes for VRF:" + " {}".format(router, static_route["vrf"]) + ) + + cmd = "{} vrf {}".format(command, static_route["vrf"]) + + else: + cmd = "{}".format(command) + + if protocol: + cmd = "{} {}".format(cmd, protocol) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + network = static_route["network"] if "no_of_ip" in static_route: no_of_ip = static_route["no_of_ip"] else: no_of_ip = 1 + if "tag" in static_route: + _tag = static_route["tag"] + else: + _tag = None + # Generating IPs for verification ip_list = generate_ips(network, no_of_ip) + st_found = False + nh_found = False + for st_rt in ip_list: st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + if st_rt in rib_routes_json: st_found = True found_routes.append(st_rt) - if next_hop: + if fib and next_hop: if type(next_hop) is not list: next_hop = [next_hop] + for mnh in range(0, len(rib_routes_json[st_rt])): + if ( + "fib" + in rib_routes_json[st_rt][mnh]["nexthops"][0] + ): + found_hops.append( + [ + rib_r["ip"] + for rib_r in rib_routes_json[st_rt][ + mnh + ]["nexthops"] + ] + ) + + if found_hops[0]: + missing_list_of_nexthops = set( + found_hops[0] + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops[0]) + + if additional_nexthops_in_required_nhs: + logger.info( + "Nexthop " + "%s is not active for route %s in " + "RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is not active" + " for route {} in RIB of router" + " {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + + elif next_hop and fib is None: + if type(next_hop) is not list: + next_hop = [next_hop] found_hops = [ rib_r["ip"] for rib_r in rib_routes_json[st_rt][0]["nexthops"] ] - for nh in found_hops: - nh_found = False - if nh and nh in next_hop: - nh_found = True - else: + + if found_hops: + missing_list_of_nexthops = set( + found_hops + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops) + + if additional_nexthops_in_required_nhs: + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) errormsg = ( - "Nexthop {} is Missing for {}" - " route {} in RIB of router" - " {}\n".format( - next_hop, protocol, st_rt, dut + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, ) ) - return errormsg + else: + nh_found = True + + if tag: + if "tag" not in rib_routes_json[st_rt][0]: + errormsg = ( + "[DUT: {}]: tag is not" + " present for" + " route {} in RIB \n".format(dut, st_rt) + ) + return errormsg + + if _tag != rib_routes_json[st_rt][0]["tag"]: + errormsg = ( + "[DUT: {}]: tag value {}" + " is not matched for" + " route {} in RIB \n".format(dut, _tag, st_rt,) + ) + return errormsg + + if metric is not None: + if "metric" not in rib_routes_json[st_rt][0]: + errormsg = ( + "[DUT: {}]: metric is" + " not present for" + " route {} in RIB \n".format(dut, st_rt) + ) + return errormsg + + if metric != rib_routes_json[st_rt][0]["metric"]: + errormsg = ( + "[DUT: {}]: metric value " + "{} is not matched for " + "route {} in RIB \n".format(dut, metric, st_rt,) + ) + return errormsg + else: missing_routes.append(st_rt) if nh_found: logger.info( - "Found next_hop %s for all routes in RIB of" " router %s\n", - next_hop, - dut, + "[DUT: {}]: Found next_hop {} for all bgp" + " routes in RIB".format(router, next_hop) ) - if not st_found and len(missing_routes) > 0: - errormsg = ( - "Missing route in RIB of router {}, routes: " - "{}\n".format(dut, missing_routes) + if len(missing_routes) > 0: + errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format( + dut, missing_routes ) return errormsg - logger.info( - "Verified routes in router %s RIB, found routes" " are: %s\n", - dut, - found_routes, - ) + if found_routes: + logger.info( + "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n", + dut, + found_routes, + ) continue if "bgp" in input_dict[routerInput]: if ( "advertise_networks" - in input_dict[routerInput]["bgp"]["address_family"][addr_type][ + not in input_dict[routerInput]["bgp"]["address_family"][addr_type][ "unicast" ] ): + continue - found_routes = [] - missing_routes = [] - advertise_network = input_dict[routerInput]["bgp"][ - "address_family" - ][addr_type]["unicast"]["advertise_networks"] + found_routes = [] + missing_routes = [] + advertise_network = input_dict[routerInput]["bgp"]["address_family"][ + addr_type + ]["unicast"]["advertise_networks"] - for advertise_network_dict in advertise_network: - start_ip = advertise_network_dict["network"] - if "no_of_network" in advertise_network_dict: - no_of_network = advertise_network_dict["no_of_network"] - else: - no_of_network = 1 - - # Generating IPs for verification - ip_list = generate_ips(start_ip, no_of_network) - for st_rt in ip_list: - st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) - - found = False - nh_found = False - if st_rt in rib_routes_json: - found = True - found_routes.append(st_rt) - - if next_hop: - if type(next_hop) is not list: - next_hop = [next_hop] - - for nh in next_hop: - for nh_json in rib_routes_json[st_rt][0][ - "nexthops" - ]: - if nh != nh_json["ip"]: - continue - nh_found = True - - if not nh_found: - errormsg = ( - "Nexthop {} is Missing" - " for {} route {} in " - "RIB of router {}\n".format( - next_hop, protocol, st_rt, dut - ) - ) - return errormsg + # Continue if there are no network advertise + if len(advertise_network) == 0: + continue + + for advertise_network_dict in advertise_network: + if "vrf" in advertise_network_dict: + cmd = "{} vrf {} json".format(command, static_route["vrf"]) + else: + cmd = "{} json".format(command) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + start_ip = advertise_network_dict["network"] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(start_ip, no_of_network) + st_found = False + nh_found = False + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + count = 0 + for nh in next_hop: + for nh_dict in rib_routes_json[st_rt][0]["nexthops"]: + if nh_dict["ip"] != nh: + continue + else: + count += 1 + if count == len(next_hop): + nh_found = True else: - missing_routes.append(st_rt) + errormsg = ( + "Nexthop {} is Missing" + " for route {} in " + "RIB of router {}\n".format(next_hop, st_rt, dut) + ) + return errormsg + else: + missing_routes.append(st_rt) - if nh_found: - logger.info( - "Found next_hop {} for all routes in RIB" - " of router {}\n".format(next_hop, dut) - ) + if nh_found: + logger.info( + "Found next_hop {} for all routes in RIB" + " of router {}\n".format(next_hop, dut) + ) - if not found and len(missing_routes) > 0: - errormsg = ( - "Missing {} route in RIB of router {}, " - "routes: {} \n".format(addr_type, dut, missing_routes) - ) - return errormsg + if len(missing_routes) > 0: + errormsg = ( + "Missing {} route in RIB of router {}, " + "routes: {} \n".format(addr_type, dut, missing_routes) + ) + return errormsg + if found_routes: logger.info( "Verified {} routes in router {} RIB, found" " routes are: {}\n".format(addr_type, dut, found_routes) ) - logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + logger.info("Exiting lib API: verify_rib()") return True |
