diff options
Diffstat (limited to 'tests/topotests/lib/bgp.py')
| -rw-r--r-- | tests/topotests/lib/bgp.py | 527 |
1 files changed, 525 insertions, 2 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index d183fb9f60..b2cd2d284d 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -33,6 +33,7 @@ from lib.common_config import ( load_config_to_router, check_address_types, generate_ips, + validate_ip_address, find_interface_with_greater_ip, run_frr_cmd, retry, @@ -82,6 +83,9 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False): "holddowntimer": 180, "dest_link": { "r4": { + "allowas-in": { + "number_occurences":2 + }, "prefix_lists": [ { "name": "pf_list_1", @@ -469,6 +473,7 @@ def __create_bgp_unicast_address_family( prefix_lists = peer.setdefault("prefix_lists", {}) route_maps = peer.setdefault("route_maps", {}) no_send_community = peer.setdefault("no_send_community", None) + allowas_in = peer.setdefault("allowas-in", None) # next-hop-self if next_hop_self: @@ -483,6 +488,13 @@ def __create_bgp_unicast_address_family( "no {} send-community {}".format(neigh_cxt, no_send_community) ) + if "allowas_in" in peer: + allow_as_in = peer["allowas_in"] + config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in)) + + if "no_allowas_in" in peer: + allow_as_in = peer["no_allowas_in"] + config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in)) if prefix_lists: for prefix_list in prefix_lists: name = prefix_list.setdefault("name", {}) @@ -517,6 +529,17 @@ def __create_bgp_unicast_address_family( cmd = "no {}".format(cmd) config_data.append(cmd) + if allowas_in: + number_occurences = allowas_in.setdefault("number_occurences", {}) + del_action = allowas_in.setdefault("delete", False) + + cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + return config_data @@ -616,6 +639,9 @@ def verify_bgp_convergence(tgen, topo): logger.debug("Entering lib API: verify_bgp_convergence()") for router, rnode in tgen.routers().iteritems(): + if "bgp" not in topo["routers"][router]: + continue + logger.info("Verifying BGP Convergence on router %s", router) show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not @@ -929,7 +955,6 @@ def clear_bgp_and_verify(tgen, topo, router): ) return errormsg - logger.info(peer_uptime_before_clear_bgp) # Clearing BGP logger.info("Clearing BGP neighborship for router %s..", router) for addr_type in bgp_addr_type.keys(): @@ -1008,7 +1033,7 @@ def clear_bgp_and_verify(tgen, topo, router): " router {}".format(router) ) return errormsg - logger.info(peer_uptime_after_clear_bgp) + # Comparing peerUptimeEstablishedEpoch dictionaries if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp: logger.info("BGP neighborship is reset after clear BGP on router %s", router) @@ -1679,3 +1704,501 @@ def verify_best_path_as_per_admin_distance( logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()") return True + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None): + """ + This API is to verify whether bgp rib has any + matching route for a nexthop. + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: input dut router name + * `addr_type` : ip type ipv4/ipv6 + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default = static + * 'aspath'[optional]: aspath which needs to be verified + + Usage + ----- + dut = 'r1' + next_hop = "192.168.1.10" + input_dict = topo['routers'] + aspath = "100 200 300" + result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict, + next_hop, aspath) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_bgp_rib()") + + router_list = tgen.routers() + additional_nexthops_in_required_nhs = [] + list1 = [] + list2 = [] + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + command = "show bgp" + + # Static routes + sleep(2) + logger.info("Checking router {} BGP RIB:".format(dut)) + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + + for static_route in static_routes: + found_routes = [] + missing_routes = [] + st_found = False + nh_found = False + vrf = static_route.setdefault("vrf", None) + if vrf: + cmd = "{} vrf {} {}".format(command, vrf, addr_type) + + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + network = static_route["network"] + + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if not isinstance(next_hop, list): + next_hop = [next_hop] + list1 = next_hop + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json["routes"][st_rt][0][ + "nexthops" + ] + ] + list2 = found_hops + missing_list_of_nexthops = set(list2).difference(list1) + additional_nexthops_in_required_nhs = set( + list1 + ).difference(list2) + + if list2: + if additional_nexthops_in_required_nhs: + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + if aspath: + found_paths = rib_routes_json["routes"][st_rt][0][ + "path" + ] + if aspath == found_paths: + aspath_found = True + logger.info( + "Found AS path {} for route" + " {} in RIB of router " + "{}\n".format(aspath, st_rt, dut) + ) + else: + errormsg = ( + "AS Path {} is missing for route" + "for route {} in RIB of router {}\n".format( + aspath, st_rt, dut + ) + ) + return errormsg + + else: + missing_routes.append(st_rt) + + if nh_found: + logger.info( + "Found next_hop {} for all bgp" + " routes in RIB of" + " router {}\n".format(next_hop, router) + ) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in RIB of router {}, " + "routes: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, " + "found routes are: {} \n".format(dut, found_routes) + ) + continue + + if "bgp" not in input_dict[routerInput]: + continue + + # Advertise networks + bgp_data_list = input_dict[routerInput]["bgp"] + + if type(bgp_data_list) is not list: + bgp_data_list = [bgp_data_list] + + for bgp_data in bgp_data_list: + vrf_id = bgp_data.setdefault("vrf", None) + if vrf_id: + cmd = "{} vrf {} {}".format(command, vrf_id, addr_type) + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"] + advertise_network = bgp_net_advertise.setdefault( + "advertise_networks", [] + ) + + for advertise_network_dict in advertise_network: + found_routes = [] + missing_routes = [] + found = False + + network = advertise_network_dict["network"] + + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_network) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + found = True + found_routes.append(st_rt) + else: + found = False + missing_routes.append(st_rt) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in BGP RIB of router {}," + " are: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, found " + "routes are: {}\n".format(dut, found_routes) + ) + + logger.debug("Exiting lib API: verify_bgp_rib()") + return True + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None): + """ + This API is to verify whether bgp rib has any + matching route for a nexthop. + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: input dut router name + * `addr_type` : ip type ipv4/ipv6 + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default = static + * 'aspath'[optional]: aspath which needs to be verified + + Usage + ----- + dut = 'r1' + next_hop = "192.168.1.10" + input_dict = topo['routers'] + aspath = "100 200 300" + result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict, + next_hop, aspath) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_bgp_rib()") + + router_list = tgen.routers() + additional_nexthops_in_required_nhs = [] + list1 = [] + list2 = [] + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + command = "show bgp" + + # Static routes + sleep(2) + logger.info("Checking router {} BGP RIB:".format(dut)) + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + + for static_route in static_routes: + found_routes = [] + missing_routes = [] + st_found = False + nh_found = False + vrf = static_route.setdefault("vrf", None) + if vrf: + cmd = "{} vrf {} {}".format(command, vrf, addr_type) + + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + network = static_route["network"] + + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if not isinstance(next_hop, list): + next_hop = [next_hop] + list1 = next_hop + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json["routes"][st_rt][0][ + "nexthops" + ] + ] + list2 = found_hops + missing_list_of_nexthops = set(list2).difference(list1) + additional_nexthops_in_required_nhs = set( + list1 + ).difference(list2) + + if list2: + if additional_nexthops_in_required_nhs: + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + if aspath: + found_paths = rib_routes_json["routes"][st_rt][0][ + "path" + ] + if aspath == found_paths: + aspath_found = True + logger.info( + "Found AS path {} for route" + " {} in RIB of router " + "{}\n".format(aspath, st_rt, dut) + ) + else: + errormsg = ( + "AS Path {} is missing for route" + "for route {} in RIB of router {}\n".format( + aspath, st_rt, dut + ) + ) + return errormsg + + else: + missing_routes.append(st_rt) + + if nh_found: + logger.info( + "Found next_hop {} for all bgp" + " routes in RIB of" + " router {}\n".format(next_hop, router) + ) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in RIB of router {}, " + "routes: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, " + "found routes are: {} \n".format(dut, found_routes) + ) + continue + + if "bgp" not in input_dict[routerInput]: + continue + + # Advertise networks + bgp_data_list = input_dict[routerInput]["bgp"] + + if type(bgp_data_list) is not list: + bgp_data_list = [bgp_data_list] + + for bgp_data in bgp_data_list: + vrf_id = bgp_data.setdefault("vrf", None) + if vrf_id: + cmd = "{} vrf {} {}".format(command, vrf_id, addr_type) + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"] + advertise_network = bgp_net_advertise.setdefault( + "advertise_networks", [] + ) + + for advertise_network_dict in advertise_network: + found_routes = [] + missing_routes = [] + found = False + + network = advertise_network_dict["network"] + + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_network) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + found = True + found_routes.append(st_rt) + else: + found = False + missing_routes.append(st_rt) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in BGP RIB of router {}," + " are: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, found " + "routes are: {}\n".format(dut, found_routes) + ) + + logger.debug("Exiting lib API: verify_bgp_rib()") + return True |
