diff options
Diffstat (limited to 'tests/topotests/lib/bgp.py')
| -rw-r--r-- | tests/topotests/lib/bgp.py | 1279 |
1 files changed, 929 insertions, 350 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 997b72d691..cafba60abf 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -27,13 +27,17 @@ from lib import topotest from lib.topolog import logger # Import common_config to use commomnly used APIs -from lib.common_config import (create_common_configuration, - InvalidCLIError, - load_config_to_router, - check_address_types, - generate_ips, - find_interface_with_greater_ip, - run_frr_cmd, retry) +from lib.common_config import ( + create_common_configuration, + InvalidCLIError, + load_config_to_router, + check_address_types, + generate_ips, + validate_ip_address, + find_interface_with_greater_ip, + run_frr_cmd, + retry, +) BGP_CONVERGENCE_TIMEOUT = 10 @@ -79,6 +83,9 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False): "holddowntimer": 180, "dest_link": { "r4": { + "allowas-in": { + "number_occurences":2 + }, "prefix_lists": [ { "name": "pf_list_1", @@ -126,24 +133,31 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False): bgp_addr_data = bgp_data.setdefault("address_family", {}) if not bgp_addr_data: - logger.debug("Router %s: 'address_family' not present in " - "input_dict for BGP", router) + logger.debug( + "Router %s: 'address_family' not present in " "input_dict for BGP", + router, + ) else: ipv4_data = bgp_addr_data.setdefault("ipv4", {}) ipv6_data = bgp_addr_data.setdefault("ipv6", {}) - neigh_unicast = True if ipv4_data.setdefault("unicast", {}) \ - or ipv6_data.setdefault("unicast", {}) else False + neigh_unicast = ( + True + if ipv4_data.setdefault("unicast", {}) + or ipv6_data.setdefault("unicast", {}) + else False + ) if neigh_unicast: data_all_bgp = __create_bgp_unicast_neighbor( - tgen, topo, input_dict, router, - config_data=data_all_bgp) + tgen, topo, input_dict, router, config_data=data_all_bgp + ) try: - result = create_common_configuration(tgen, router, data_all_bgp, - "bgp", build) + result = create_common_configuration( + tgen, router, data_all_bgp, "bgp", build + ) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() @@ -182,8 +196,9 @@ def __create_bgp_global(tgen, input_dict, router, build=False): config_data = [] if "local_as" not in bgp_data and build: - logger.error("Router %s: 'local_as' not present in input_dict" - "for BGP", router) + logger.error( + "Router %s: 'local_as' not present in input_dict" "for BGP", router + ) return False local_as = bgp_data.setdefault("local_as", "") @@ -194,19 +209,20 @@ def __create_bgp_global(tgen, input_dict, router, build=False): config_data.append(cmd) + # Skip RFC8212 in topotests + config_data.append("no bgp ebgp-requires-policy") + router_id = bgp_data.setdefault("router_id", None) del_router_id = bgp_data.setdefault("del_router_id", False) if del_router_id: config_data.append("no bgp router-id") if router_id: - config_data.append("bgp router-id {}".format( - router_id)) + config_data.append("bgp router-id {}".format(router_id)) return config_data -def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, - config_data=None): +def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data=None): """ Helper API to create configuration for address-family unicast @@ -235,11 +251,8 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, addr_data = addr_dict["unicast"] if addr_data: - config_data.append("address-family {} unicast".format( - addr_type - )) - advertise_network = addr_data.setdefault("advertise_networks", - []) + config_data.append("address-family {} unicast".format(addr_type)) + advertise_network = addr_data.setdefault("advertise_networks", []) for advertise_network_dict in advertise_network: network = advertise_network_dict["network"] if type(network) is not list: @@ -250,12 +263,10 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, else: no_of_network = 1 - del_action = advertise_network_dict.setdefault("delete", - False) + del_action = advertise_network_dict.setdefault("delete", False) # Generating IPs for verification - prefix = str( - ipaddr.IPNetwork(unicode(network[0])).prefixlen) + prefix = str(ipaddr.IPNetwork(unicode(network[0])).prefixlen) network_list = generate_ips(network, no_of_network) for ip in network_list: ip = str(ipaddr.IPNetwork(unicode(ip)).network) @@ -271,20 +282,17 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, ibgp = max_paths.setdefault("ibgp", None) ebgp = max_paths.setdefault("ebgp", None) if ibgp: - config_data.append("maximum-paths ibgp {}".format( - ibgp - )) + config_data.append("maximum-paths ibgp {}".format(ibgp)) if ebgp: - config_data.append("maximum-paths {}".format( - ebgp - )) + config_data.append("maximum-paths {}".format(ebgp)) aggregate_addresses = addr_data.setdefault("aggregate_address", []) for aggregate_address in aggregate_addresses: network = aggregate_address.setdefault("network", None) if not network: - logger.debug("Router %s: 'network' not present in " - "input_dict for BGP", router) + logger.debug( + "Router %s: 'network' not present in " "input_dict for BGP", router + ) else: cmd = "aggregate-address {}".format(network) @@ -305,13 +313,12 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, if redistribute_data: for redistribute in redistribute_data: if "redist_type" not in redistribute: - logger.error("Router %s: 'redist_type' not present in " - "input_dict", router) + logger.error( + "Router %s: 'redist_type' not present in " "input_dict", router + ) else: - cmd = "redistribute {}".format( - redistribute["redist_type"]) - redist_attr = redistribute.setdefault("attribute", - None) + cmd = "redistribute {}".format(redistribute["redist_type"]) + redist_attr = redistribute.setdefault("attribute", None) if redist_attr: cmd = "{} {}".format(cmd, redist_attr) del_action = redistribute.setdefault("delete", False) @@ -320,8 +327,9 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data.append(cmd) if "neighbor" in addr_data: - neigh_data = __create_bgp_neighbor(topo, input_dict, - router, addr_type, add_neigh) + neigh_data = __create_bgp_neighbor( + topo, input_dict, router, addr_type, add_neigh + ) config_data.extend(neigh_data) for addr_type, addr_dict in bgp_data.iteritems(): @@ -331,11 +339,11 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, addr_data = addr_dict["unicast"] if "neighbor" in addr_data: neigh_addr_data = __create_bgp_unicast_address_family( - topo, input_dict, router, addr_type, add_neigh) + topo, input_dict, router, addr_type, add_neigh + ) config_data.extend(neigh_addr_data) - logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") return config_data @@ -365,12 +373,10 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): update_source = None if dest_link in nh_details["links"].keys(): - ip_addr = \ - nh_details["links"][dest_link][addr_type].split("/")[0] + ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0] # Loopback interface if "source_link" in peer and peer["source_link"] == "lo": - update_source = topo[router]["links"]["lo"][ - addr_type].split("/")[0] + update_source = topo[router]["links"]["lo"][addr_type].split("/")[0] neigh_cxt = "neighbor {}".format(ip_addr) @@ -380,41 +386,44 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): config_data.append("address-family ipv6 unicast") config_data.append("{} activate".format(neigh_cxt)) - disable_connected = peer.setdefault("disable_connected_check", - False) + disable_connected = peer.setdefault("disable_connected_check", False) keep_alive = peer.setdefault("keepalivetimer", 60) hold_down = peer.setdefault("holddowntimer", 180) password = peer.setdefault("password", None) max_hop_limit = peer.setdefault("ebgp_multihop", 1) if update_source: - config_data.append("{} update-source {}".format( - neigh_cxt, update_source)) + config_data.append( + "{} update-source {}".format(neigh_cxt, update_source) + ) if disable_connected: - config_data.append("{} disable-connected-check".format( - disable_connected)) + config_data.append( + "{} disable-connected-check".format(disable_connected) + ) if update_source: - config_data.append("{} update-source {}".format(neigh_cxt, - update_source)) + config_data.append( + "{} update-source {}".format(neigh_cxt, update_source) + ) if int(keep_alive) != 60 and int(hold_down) != 180: config_data.append( - "{} timers {} {}".format(neigh_cxt, keep_alive, - hold_down)) + "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down) + ) if password: - config_data.append( - "{} password {}".format(neigh_cxt, password)) + config_data.append("{} password {}".format(neigh_cxt, password)) if max_hop_limit > 1: - config_data.append("{} ebgp-multihop {}".format(neigh_cxt, - max_hop_limit)) + config_data.append( + "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit) + ) config_data.append("{} enforce-multihop".format(neigh_cxt)) logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") return config_data -def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, - add_neigh=True): +def __create_bgp_unicast_address_family( + topo, input_dict, router, addr_type, add_neigh=True +): """ API prints bgp global config to bgp_json file. @@ -440,37 +449,34 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, nh_details = topo[peer_name] # Loopback interface if "source_link" in peer and peer["source_link"] == "lo": - for destRouterLink, data in sorted(nh_details["links"]. - iteritems()): + for destRouterLink, data in sorted(nh_details["links"].iteritems()): if "type" in data and data["type"] == "loopback": if dest_link == destRouterLink: - ip_addr = \ - nh_details["links"][destRouterLink][ - addr_type].split("/")[0] + ip_addr = nh_details["links"][destRouterLink][ + addr_type + ].split("/")[0] # Physical interface else: if dest_link in nh_details["links"].keys(): - ip_addr = nh_details["links"][dest_link][ - addr_type].split("/")[0] + ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0] if addr_type == "ipv4" and bgp_data["ipv6"]: - deactivate = nh_details["links"][ - dest_link]["ipv6"].split("/")[0] + deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[ + 0 + ] neigh_cxt = "neighbor {}".format(ip_addr) - config_data.append("address-family {} unicast".format( - addr_type - )) + config_data.append("address-family {} unicast".format(addr_type)) if deactivate: - config_data.append( - "no neighbor {} activate".format(deactivate)) + config_data.append("no neighbor {} activate".format(deactivate)) next_hop_self = peer.setdefault("next_hop_self", None) send_community = peer.setdefault("send_community", None) prefix_lists = peer.setdefault("prefix_lists", {}) route_maps = peer.setdefault("route_maps", {}) no_send_community = peer.setdefault("no_send_community", None) + allowas_in = peer.setdefault("allowas-in", None) # next-hop-self if next_hop_self: @@ -481,21 +487,30 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, # no_send_community if no_send_community: - config_data.append("no {} send-community {}".format( - neigh_cxt, no_send_community)) + config_data.append( + "no {} send-community {}".format(neigh_cxt, no_send_community) + ) + + if "allowas_in" in peer: + allow_as_in = peer["allowas_in"] + config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in)) + if "no_allowas_in" in peer: + allow_as_in = peer["no_allowas_in"] + config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in)) if prefix_lists: for prefix_list in prefix_lists: name = prefix_list.setdefault("name", {}) direction = prefix_list.setdefault("direction", "in") del_action = prefix_list.setdefault("delete", False) if not name: - logger.info("Router %s: 'name' not present in " - "input_dict for BGP neighbor prefix lists", - router) + logger.info( + "Router %s: 'name' not present in " + "input_dict for BGP neighbor prefix lists", + router, + ) else: - cmd = "{} prefix-list {} {}".format(neigh_cxt, name, - direction) + cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) @@ -506,16 +521,28 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, direction = route_map.setdefault("direction", "in") del_action = route_map.setdefault("delete", False) if not name: - logger.info("Router %s: 'name' not present in " - "input_dict for BGP neighbor route name", - router) + logger.info( + "Router %s: 'name' not present in " + "input_dict for BGP neighbor route name", + router, + ) else: - cmd = "{} route-map {} {}".format(neigh_cxt, name, - direction) + cmd = "{} route-map {} {}".format(neigh_cxt, name, direction) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) + if allowas_in: + number_occurences = allowas_in.setdefault("number_occurences", {}) + del_action = allowas_in.setdefault("delete", False) + + cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + return config_data @@ -564,12 +591,10 @@ def verify_router_id(tgen, topo, input_dict): rnode = tgen.routers()[router] - del_router_id = input_dict[router]["bgp"].setdefault( - "del_router_id", False) + del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False) logger.info("Checking router %s router-id", router) - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) router_id_out = show_bgp_json["ipv4Unicast"]["routerId"] router_id_out = ipaddr.IPv4Address(unicode(router_id_out)) @@ -582,12 +607,12 @@ def verify_router_id(tgen, topo, input_dict): router_id = ipaddr.IPv4Address(unicode(router_id)) if router_id == router_id_out: - logger.info("Found expected router-id %s for router %s", - router_id, router) + logger.info("Found expected router-id %s for router %s", router_id, router) else: - errormsg = "Router-id for router:{} mismatch, expected:" \ - " {} but found:{}".format(router, router_id, - router_id_out) + errormsg = ( + "Router-id for router:{} mismatch, expected:" + " {} but found:{}".format(router, router_id, router_id_out) + ) return errormsg logger.debug("Exiting lib API: verify_router_id()") @@ -617,9 +642,11 @@ def verify_bgp_convergence(tgen, topo): logger.debug("Entering lib API: verify_bgp_convergence()") for router, rnode in tgen.routers().iteritems(): + if "bgp" not in topo["routers"][router]: + continue + logger.info("Verifying BGP Convergence on router %s", router) - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -647,15 +674,12 @@ def verify_bgp_convergence(tgen, topo): for dest_link in peer_data["dest_link"].keys(): data = topo["routers"][bgp_neighbor]["links"] if dest_link in data: - neighbor_ip = \ - data[dest_link][addr_type].split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] if addr_type == "ipv4": - ipv4_data = show_bgp_json["ipv4Unicast"][ - "peers"] + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] nh_state = ipv4_data[neighbor_ip]["state"] else: - ipv6_data = show_bgp_json["ipv6Unicast"][ - "peers"] + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] if nh_state == "Established": @@ -663,8 +687,7 @@ def verify_bgp_convergence(tgen, topo): if no_of_peer == total_peer: logger.info("BGP is Converged for router %s", router) else: - errormsg = "BGP is not converged for router {}".format( - router) + errormsg = "BGP is not converged for router {}".format(router) return errormsg logger.debug("Exiting API: verify_bgp_convergence()") @@ -707,16 +730,9 @@ def modify_as_number(tgen, topo, input_dict): for router in input_dict.keys(): # Remove bgp configuration - router_dict.update({ - router: { - "bgp": { - "delete": True - } - } - }) + router_dict.update({router: {"bgp": {"delete": True}}}) - new_topo[router]["bgp"]["local_as"] = \ - input_dict[router]["bgp"]["local_as"] + new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"] logger.info("Removing bgp configuration") create_router_bgp(tgen, topo, router_dict) @@ -777,8 +793,9 @@ def verify_as_numbers(tgen, topo, input_dict): logger.info("Verifying AS numbers for dut %s:", router) - show_ip_bgp_neighbor_json = run_frr_cmd(rnode, - "show ip bgp neighbor json", isjson=True) + show_ip_bgp_neighbor_json = run_frr_cmd( + rnode, "show ip bgp neighbor json", isjson=True + ) local_as = input_dict[router]["bgp"]["local_as"] bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] @@ -786,8 +803,7 @@ def verify_as_numbers(tgen, topo, input_dict): if not check_address_types(addr_type): continue - bgp_neighbors = bgp_addr_type[addr_type]["unicast"][ - "neighbor"] + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"] @@ -796,32 +812,42 @@ def verify_as_numbers(tgen, topo, input_dict): data = topo["routers"][bgp_neighbor]["links"] if dest_link in data: - neighbor_ip = data[dest_link][addr_type]. \ - split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] neigh_data = show_ip_bgp_neighbor_json[neighbor_ip] # Verify Local AS for router if neigh_data["localAs"] != local_as: - errormsg = "Failed: Verify local_as for dut {}," \ - " found: {} but expected: {}".format( - router, neigh_data["localAs"], - local_as) + errormsg = ( + "Failed: Verify local_as for dut {}," + " found: {} but expected: {}".format( + router, neigh_data["localAs"], local_as + ) + ) return errormsg else: - logger.info("Verified local_as for dut %s, found" - " expected: %s", router, local_as) + logger.info( + "Verified local_as for dut %s, found" " expected: %s", + router, + local_as, + ) # Verify Remote AS for neighbor if neigh_data["remoteAs"] != remote_as: - errormsg = "Failed: Verify remote_as for dut " \ - "{}'s neighbor {}, found: {} but " \ - "expected: {}".format( - router, bgp_neighbor, - neigh_data["remoteAs"], remote_as) + errormsg = ( + "Failed: Verify remote_as for dut " + "{}'s neighbor {}, found: {} but " + "expected: {}".format( + router, bgp_neighbor, neigh_data["remoteAs"], remote_as + ) + ) return errormsg else: - logger.info("Verified remote_as for dut %s's " - "neighbor %s, found expected: %s", - router, bgp_neighbor, remote_as) + logger.info( + "Verified remote_as for dut %s's " + "neighbor %s, found expected: %s", + router, + bgp_neighbor, + remote_as, + ) logger.debug("Exiting lib API: verify_AS_numbers()") return True @@ -862,12 +888,14 @@ def clear_bgp_and_verify(tgen, topo, router): for retry in range(31): sleeptime = 3 # Waiting for BGP to converge - logger.info("Waiting for %s sec for BGP to converge on router" - " %s...", sleeptime, router) + logger.info( + "Waiting for %s sec for BGP to converge on router" " %s...", + sleeptime, + router, + ) sleep(sleeptime) - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -897,38 +925,39 @@ def clear_bgp_and_verify(tgen, topo, router): if dest_link in data: neighbor_ip = data[dest_link][addr_type].split("/")[0] if addr_type == "ipv4": - ipv4_data = show_bgp_json["ipv4Unicast"][ - "peers"] + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] nh_state = ipv4_data[neighbor_ip]["state"] # Peer up time dictionary - peer_uptime_before_clear_bgp[bgp_neighbor] = \ - ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] else: - ipv6_data = show_bgp_json["ipv6Unicast"][ - "peers"] + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] # Peer up time dictionary - peer_uptime_before_clear_bgp[bgp_neighbor] = \ - ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] if nh_state == "Established": no_of_peer += 1 if no_of_peer == total_peer: - logger.info("BGP is Converged for router %s before bgp" - " clear", router) + logger.info("BGP is Converged for router %s before bgp" " clear", router) break else: - logger.info("BGP is not yet Converged for router %s " - "before bgp clear", router) + logger.info( + "BGP is not yet Converged for router %s " "before bgp clear", router + ) else: - errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \ - " router {}".format(router) + errormsg = ( + "TIMEOUT!! BGP is not converged in 30 seconds for" + " router {}".format(router) + ) return errormsg - logger.info(peer_uptime_before_clear_bgp) # Clearing BGP logger.info("Clearing BGP neighborship for router %s..", router) for addr_type in bgp_addr_type.keys(): @@ -942,13 +971,14 @@ def clear_bgp_and_verify(tgen, topo, router): for retry in range(31): sleeptime = 3 # Waiting for BGP to converge - logger.info("Waiting for %s sec for BGP to converge on router" - " %s...", sleeptime, router) + logger.info( + "Waiting for %s sec for BGP to converge on router" " %s...", + sleeptime, + router, + ) sleep(sleeptime) - - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -975,44 +1005,46 @@ def clear_bgp_and_verify(tgen, topo, router): data = topo["routers"][bgp_neighbor]["links"] if dest_link in data: - neighbor_ip = data[dest_link][addr_type].\ - split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] if addr_type == "ipv4": - ipv4_data = show_bgp_json["ipv4Unicast"][ - "peers"] + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] nh_state = ipv4_data[neighbor_ip]["state"] - peer_uptime_after_clear_bgp[bgp_neighbor] = \ - ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] else: - ipv6_data = show_bgp_json["ipv6Unicast"][ - "peers"] + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] # Peer up time dictionary - peer_uptime_after_clear_bgp[bgp_neighbor] = \ - ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] if nh_state == "Established": no_of_peer += 1 if no_of_peer == total_peer: - logger.info("BGP is Converged for router %s after bgp clear", - router) + logger.info("BGP is Converged for router %s after bgp clear", router) break else: - logger.info("BGP is not yet Converged for router %s after" - " bgp clear", router) + logger.info( + "BGP is not yet Converged for router %s after" " bgp clear", router + ) else: - errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \ - " router {}".format(router) + errormsg = ( + "TIMEOUT!! BGP is not converged in 30 seconds for" + " router {}".format(router) + ) return errormsg - logger.info(peer_uptime_after_clear_bgp) + # Comparing peerUptimeEstablishedEpoch dictionaries if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp: - logger.info("BGP neighborship is reset after clear BGP on router %s", - router) + logger.info("BGP neighborship is reset after clear BGP on router %s", router) else: - errormsg = "BGP neighborship is not reset after clear bgp on router" \ - " {}".format(router) + errormsg = ( + "BGP neighborship is not reset after clear bgp on router" + " {}".format(router) + ) return errormsg logger.debug("Exiting lib API: clear_bgp_and_verify()") @@ -1060,11 +1092,11 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): rnode = router_list[router] - logger.info("Verifying bgp timers functionality, DUT is %s:", - router) + logger.info("Verifying bgp timers functionality, DUT is %s:", router) - show_ip_bgp_neighbor_json = \ - run_frr_cmd(rnode, "show ip bgp neighbor json", isjson=True) + show_ip_bgp_neighbor_json = run_frr_cmd( + rnode, "show ip bgp neighbor json", isjson=True + ) bgp_addr_type = input_dict[router]["bgp"]["address_family"] @@ -1072,8 +1104,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): if not check_address_types(addr_type): continue - bgp_neighbors = bgp_addr_type[addr_type]["unicast"][ - "neighbor"] + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): for dest_link, peer_dict in peer_data["dest_link"].iteritems(): data = topo["routers"][bgp_neighbor]["links"] @@ -1082,32 +1113,41 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): holddowntimer = peer_dict["holddowntimer"] if dest_link in data: - neighbor_ip = data[dest_link][addr_type]. \ - split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] neighbor_intf = data[dest_link]["interface"] # Verify HoldDownTimer for neighbor - bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[ - neighbor_ip]["bgpTimerHoldTimeMsecs"] + bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][ + "bgpTimerHoldTimeMsecs" + ] if bgpHoldTimeMsecs != holddowntimer * 1000: - errormsg = "Verifying holddowntimer for bgp " \ - "neighbor {} under dut {}, found: {} " \ - "but expected: {}".format( - neighbor_ip, router, - bgpHoldTimeMsecs, - holddowntimer * 1000) + errormsg = ( + "Verifying holddowntimer for bgp " + "neighbor {} under dut {}, found: {} " + "but expected: {}".format( + neighbor_ip, + router, + bgpHoldTimeMsecs, + holddowntimer * 1000, + ) + ) return errormsg # Verify KeepAliveTimer for neighbor - bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[ - neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"] + bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][ + "bgpTimerKeepAliveIntervalMsecs" + ] if bgpKeepAliveTimeMsecs != keepalivetimer * 1000: - errormsg = "Verifying keepalivetimer for bgp " \ - "neighbor {} under dut {}, found: {} " \ - "but expected: {}".format( - neighbor_ip, router, - bgpKeepAliveTimeMsecs, - keepalivetimer * 1000) + errormsg = ( + "Verifying keepalivetimer for bgp " + "neighbor {} under dut {}, found: {} " + "but expected: {}".format( + neighbor_ip, + router, + bgpKeepAliveTimeMsecs, + keepalivetimer * 1000, + ) + ) return errormsg #################### @@ -1120,40 +1160,50 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): # Wait till keep alive time logger.info("=" * 20) logger.info("Scenario 1:") - logger.info("Shutdown and bring up peer interface: %s " - "in keep alive time : %s sec and verify " - " BGP neighborship is intact in %s sec ", - neighbor_intf, keepalivetimer, - (holddowntimer - keepalivetimer)) + logger.info( + "Shutdown and bring up peer interface: %s " + "in keep alive time : %s sec and verify " + " BGP neighborship is intact in %s sec ", + neighbor_intf, + keepalivetimer, + (holddowntimer - keepalivetimer), + ) logger.info("=" * 20) logger.info("Waiting for %s sec..", keepalivetimer) sleep(keepalivetimer) # Shutting down peer ineterface - logger.info("Shutting down interface %s on router %s", - neighbor_intf, bgp_neighbor) + logger.info( + "Shutting down interface %s on router %s", + neighbor_intf, + bgp_neighbor, + ) topotest.interface_set_status( - router_list[bgp_neighbor], neighbor_intf, - ifaceaction=False) + router_list[bgp_neighbor], neighbor_intf, ifaceaction=False + ) # Bringing up peer interface sleep(5) - logger.info("Bringing up interface %s on router %s..", - neighbor_intf, bgp_neighbor) + logger.info( + "Bringing up interface %s on router %s..", + neighbor_intf, + bgp_neighbor, + ) topotest.interface_set_status( - router_list[bgp_neighbor], neighbor_intf, - ifaceaction=True) + router_list[bgp_neighbor], neighbor_intf, ifaceaction=True + ) # Verifying BGP neighborship is intact in # (holddown - keepalive) time - for timer in range(keepalivetimer, holddowntimer, - int(holddowntimer / 3)): + for timer in range( + keepalivetimer, holddowntimer, int(holddowntimer / 3) + ): logger.info("Waiting for %s sec..", keepalivetimer) sleep(keepalivetimer) sleep(2) - show_bgp_json = \ - run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd( + rnode, "show bgp summary json", isjson=True + ) if addr_type == "ipv4": ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] @@ -1162,17 +1212,22 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] - if timer == \ - (holddowntimer - keepalivetimer): + if timer == (holddowntimer - keepalivetimer): if nh_state != "Established": - errormsg = "BGP neighborship has not gone " \ - "down in {} sec for neighbor {}" \ - .format(timer, bgp_neighbor) + errormsg = ( + "BGP neighborship has not gone " + "down in {} sec for neighbor {}".format( + timer, bgp_neighbor + ) + ) return errormsg else: - logger.info("BGP neighborship is intact in %s" - " sec for neighbor %s", - timer, bgp_neighbor) + logger.info( + "BGP neighborship is intact in %s" + " sec for neighbor %s", + timer, + bgp_neighbor, + ) #################### # Shutting down peer interface and verifying that BGP @@ -1180,27 +1235,36 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): #################### logger.info("=" * 20) logger.info("Scenario 2:") - logger.info("Shutdown peer interface: %s and verify BGP" - " neighborship has gone down in hold down " - "time %s sec", neighbor_intf, holddowntimer) + logger.info( + "Shutdown peer interface: %s and verify BGP" + " neighborship has gone down in hold down " + "time %s sec", + neighbor_intf, + holddowntimer, + ) logger.info("=" * 20) - logger.info("Shutting down interface %s on router %s..", - neighbor_intf, bgp_neighbor) - topotest.interface_set_status(router_list[bgp_neighbor], - neighbor_intf, - ifaceaction=False) + logger.info( + "Shutting down interface %s on router %s..", + neighbor_intf, + bgp_neighbor, + ) + topotest.interface_set_status( + router_list[bgp_neighbor], neighbor_intf, ifaceaction=False + ) # Verifying BGP neighborship is going down in holddown time - for timer in range(keepalivetimer, - (holddowntimer + keepalivetimer), - int(holddowntimer / 3)): + for timer in range( + keepalivetimer, + (holddowntimer + keepalivetimer), + int(holddowntimer / 3), + ): logger.info("Waiting for %s sec..", keepalivetimer) sleep(keepalivetimer) sleep(2) - show_bgp_json = \ - run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd( + rnode, "show bgp summary json", isjson=True + ) if addr_type == "ipv4": ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] @@ -1211,22 +1275,29 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): if timer == holddowntimer: if nh_state == "Established": - errormsg = "BGP neighborship has not gone " \ - "down in {} sec for neighbor {}" \ - .format(timer, bgp_neighbor) + errormsg = ( + "BGP neighborship has not gone " + "down in {} sec for neighbor {}".format( + timer, bgp_neighbor + ) + ) return errormsg else: - logger.info("BGP neighborship has gone down in" - " %s sec for neighbor %s", - timer, bgp_neighbor) + logger.info( + "BGP neighborship has gone down in" + " %s sec for neighbor %s", + timer, + bgp_neighbor, + ) logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()") return True @retry(attempts=3, wait=4, return_is_str=True) -def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, - input_dict, seq_id=None): +def verify_bgp_attributes( + tgen, addr_type, dut, static_routes, rmap_name, input_dict, seq_id=None +): """ API will verify BGP attributes set by Route-map for given prefix and DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command @@ -1256,7 +1327,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, } }, "set": { - "localpref": 150, + "locPrf": 150, "weight": 100 } }], @@ -1269,7 +1340,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, } }, "set": { - "med": 50 + "metric": 50 } }] } @@ -1288,7 +1359,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, if router != dut: continue - logger.info('Verifying BGP set attributes for dut {}:'.format(router)) + logger.info("Verifying BGP set attributes for dut {}:".format(router)) for static_route in static_routes: cmd = "show bgp {} {} json".format(addr_type, static_route) @@ -1297,8 +1368,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, dict_to_test = [] tmp_list = [] for rmap_router in input_dict.keys(): - for rmap, values in input_dict[rmap_router][ - "route_maps"].items(): + for rmap, values in input_dict[rmap_router]["route_maps"].items(): if rmap == rmap_name: dict_to_test = values for rmap_dict in values: @@ -1307,8 +1377,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, seq_id = [seq_id] if "seq_id" in rmap_dict: - rmap_seq_id = \ - rmap_dict["seq_id"] + rmap_seq_id = rmap_dict["seq_id"] for _seq_id in seq_id: if _seq_id == rmap_seq_id: tmp_list.append(rmap_dict) @@ -1318,55 +1387,56 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, for rmap_dict in dict_to_test: if "set" in rmap_dict: for criteria in rmap_dict["set"].keys(): - if criteria not in show_bgp_json[ - "paths"][0]: - errormsg = ("BGP attribute: {}" - " is not found in" - " cli: {} output " - "in router {}". - format(criteria, - cmd, - router)) + if criteria not in show_bgp_json["paths"][0]: + errormsg = ( + "BGP attribute: {}" + " is not found in" + " cli: {} output " + "in router {}".format(criteria, cmd, router) + ) return errormsg - if rmap_dict["set"][criteria] == \ - show_bgp_json["paths"][0][ - criteria]: - logger.info("Verifying BGP " - "attribute {} for" - " route: {} in " - "router: {}, found" - " expected value:" - " {}". - format(criteria, - static_route, - dut, - rmap_dict[ - "set"][ - criteria])) + if ( + rmap_dict["set"][criteria] + == show_bgp_json["paths"][0][criteria] + ): + logger.info( + "Verifying BGP " + "attribute {} for" + " route: {} in " + "router: {}, found" + " expected value:" + " {}".format( + criteria, + static_route, + dut, + rmap_dict["set"][criteria], + ) + ) else: - errormsg = \ - ("Failed: Verifying BGP " - "attribute {} for route:" - " {} in router: {}, " - " expected value: {} but" - " found: {}". - format(criteria, - static_route, - dut, - rmap_dict["set"] - [criteria], - show_bgp_json[ - 'paths'][ - 0][criteria])) + errormsg = ( + "Failed: Verifying BGP " + "attribute {} for route:" + " {} in router: {}, " + " expected value: {} but" + " found: {}".format( + criteria, + static_route, + dut, + rmap_dict["set"][criteria], + show_bgp_json["paths"][0][criteria], + ) + ) return errormsg logger.debug("Exiting lib API: verify_bgp_attributes()") return True + @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) -def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, - attribute): +def verify_best_path_as_per_bgp_attribute( + tgen, addr_type, router, input_dict, attribute +): """ API is to verify best path according to BGP attributes for given routes. "show bgp ipv4/6 json" command will be run and verify best path according @@ -1406,7 +1476,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, } } } - attribute = "localpref" + attribute = "locPrf" result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \ input_dict, attribute) Returns @@ -1443,40 +1513,38 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, attribute_dict[next_hop_ip] = route_attribute[attribute] # AS_PATH attribute - if attribute == "aspath": + if attribute == "path": # Find next_hop for the route have minimum as_path - _next_hop = min(attribute_dict, key=lambda x: len(set( - attribute_dict[x]))) + _next_hop = min( + attribute_dict, key=lambda x: len(set(attribute_dict[x])) + ) compare = "SHORTEST" # LOCAL_PREF attribute - elif attribute == "localpref": + elif attribute == "locPrf": # Find next_hop for the route have highest local preference - _next_hop = max(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "HIGHEST" # WEIGHT attribute elif attribute == "weight": # Find next_hop for the route have highest weight - _next_hop = max(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "HIGHEST" # ORIGIN attribute elif attribute == "origin": # Find next_hop for the route have IGP as origin, - # - rule is IGP>EGP>INCOMPLETE - _next_hop = [key for (key, value) in - attribute_dict.iteritems() - if value == "IGP"][0] + _next_hop = [ + key for (key, value) in attribute_dict.iteritems() if value == "IGP" + ][0] compare = "" # MED attribute - elif attribute == "med": + elif attribute == "metric": # Find next_hop for the route have LOWEST MED - _next_hop = min(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "LOWEST" # Show ip route @@ -1489,8 +1557,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, # Verifying output dictionary rib_routes_json is not empty if not bool(rib_routes_json): - errormsg = "No route found in RIB of router {}..". \ - format(router) + errormsg = "No route found in RIB of router {}..".format(router) return errormsg st_found = False @@ -1499,31 +1566,41 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, if route in rib_routes_json: st_found = True # Verify next_hop in rib_routes_json - if rib_routes_json[route][0]["nexthops"][0]["ip"] in \ - attribute_dict: + if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict: nh_found = True else: - errormsg = "Incorrect Nexthop for BGP route {} in " \ - "RIB of router {}, Expected: {}, Found:" \ - " {}\n".format(route, router, - rib_routes_json[route][0][ - "nexthops"][0]["ip"], - _next_hop) + errormsg = ( + "Incorrect Nexthop for BGP route {} in " + "RIB of router {}, Expected: {}, Found:" + " {}\n".format( + route, + router, + rib_routes_json[route][0]["nexthops"][0]["ip"], + _next_hop, + ) + ) return errormsg if st_found and nh_found: logger.info( "Best path for prefix: %s with next_hop: %s is " "installed according to %s %s: (%s) in RIB of " - "router %s", route, _next_hop, compare, - attribute, attribute_dict[_next_hop], router) + "router %s", + route, + _next_hop, + compare, + attribute, + attribute_dict[_next_hop], + router, + ) logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()") return True -def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, - attribute): +def verify_best_path_as_per_admin_distance( + tgen, addr_type, router, input_dict, attribute +): """ API is to verify best path according to admin distance for given route. "show ip/ipv6 route json" command will be run and verify @@ -1548,7 +1625,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, {"network": "200.50.2.0/32", \ "admin_distance": 60, "next_hop": "10.0.0.18"}] }} - attribute = "localpref" + attribute = "locPrf" result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \ input_dict, attribute): Returns @@ -1574,7 +1651,8 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, for routes_from_router in input_dict.keys(): sh_ip_route_json = router_list[routes_from_router].vtysh_cmd( - command, isjson=True) + command, isjson=True + ) networks = input_dict[routes_from_router]["static_routes"] for network in networks: route = network["network"] @@ -1590,8 +1668,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, attribute_dict[next_hop_ip] = route_attribute["distance"] # Find next_hop for the route have LOWEST Admin Distance - _next_hop = min(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "LOWEST" # Show ip route @@ -1608,21 +1685,523 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, if route in rib_routes_json: st_found = True # Verify next_hop in rib_routes_json - if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ - _next_hop: + if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop: nh_found = True else: - errormsg = ("Nexthop {} is Missing for BGP route {}" - " in RIB of router {}\n".format(_next_hop, - route, router)) + errormsg = ( + "Nexthop {} is Missing for BGP route {}" + " in RIB of router {}\n".format(_next_hop, route, router) + ) return errormsg if st_found and nh_found: - logger.info("Best path for prefix: %s is installed according" - " to %s %s: (%s) in RIB of router %s", route, - compare, attribute, - attribute_dict[_next_hop], router) + logger.info( + "Best path for prefix: %s is installed according" + " to %s %s: (%s) in RIB of router %s", + route, + compare, + attribute, + attribute_dict[_next_hop], + router, + ) + + logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()") + return True + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None): + """ + This API is to verify whether bgp rib has any + matching route for a nexthop. + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: input dut router name + * `addr_type` : ip type ipv4/ipv6 + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default = static + * 'aspath'[optional]: aspath which needs to be verified + + Usage + ----- + dut = 'r1' + next_hop = "192.168.1.10" + input_dict = topo['routers'] + aspath = "100 200 300" + result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict, + next_hop, aspath) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_bgp_rib()") + + router_list = tgen.routers() + additional_nexthops_in_required_nhs = [] + list1 = [] + list2 = [] + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + command = "show bgp" + + # Static routes + sleep(2) + logger.info("Checking router {} BGP RIB:".format(dut)) + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + + for static_route in static_routes: + found_routes = [] + missing_routes = [] + st_found = False + nh_found = False + vrf = static_route.setdefault("vrf", None) + if vrf: + cmd = "{} vrf {} {}".format(command, vrf, addr_type) + + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + network = static_route["network"] + + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if not isinstance(next_hop, list): + next_hop = [next_hop] + list1 = next_hop + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json["routes"][st_rt][0][ + "nexthops" + ] + ] + list2 = found_hops + missing_list_of_nexthops = set(list2).difference(list1) + additional_nexthops_in_required_nhs = set( + list1 + ).difference(list2) + + if list2: + if additional_nexthops_in_required_nhs: + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + if aspath: + found_paths = rib_routes_json["routes"][st_rt][0][ + "path" + ] + if aspath == found_paths: + aspath_found = True + logger.info( + "Found AS path {} for route" + " {} in RIB of router " + "{}\n".format(aspath, st_rt, dut) + ) + else: + errormsg = ( + "AS Path {} is missing for route" + "for route {} in RIB of router {}\n".format( + aspath, st_rt, dut + ) + ) + return errormsg + + else: + missing_routes.append(st_rt) + + if nh_found: + logger.info( + "Found next_hop {} for all bgp" + " routes in RIB of" + " router {}\n".format(next_hop, router) + ) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in RIB of router {}, " + "routes: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, " + "found routes are: {} \n".format(dut, found_routes) + ) + continue + + if "bgp" not in input_dict[routerInput]: + continue + + # Advertise networks + bgp_data_list = input_dict[routerInput]["bgp"] + + if type(bgp_data_list) is not list: + bgp_data_list = [bgp_data_list] + + for bgp_data in bgp_data_list: + vrf_id = bgp_data.setdefault("vrf", None) + if vrf_id: + cmd = "{} vrf {} {}".format(command, vrf_id, addr_type) + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"] + advertise_network = bgp_net_advertise.setdefault( + "advertise_networks", [] + ) + + for advertise_network_dict in advertise_network: + found_routes = [] + missing_routes = [] + found = False + + network = advertise_network_dict["network"] + + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_network) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + found = True + found_routes.append(st_rt) + else: + found = False + missing_routes.append(st_rt) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in BGP RIB of router {}," + " are: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, found " + "routes are: {}\n".format(dut, found_routes) + ) + + logger.debug("Exiting lib API: verify_bgp_rib()") + return True + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None): + """ + This API is to verify whether bgp rib has any + matching route for a nexthop. + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: input dut router name + * `addr_type` : ip type ipv4/ipv6 + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default = static + * 'aspath'[optional]: aspath which needs to be verified + + Usage + ----- + dut = 'r1' + next_hop = "192.168.1.10" + input_dict = topo['routers'] + aspath = "100 200 300" + result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict, + next_hop, aspath) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_bgp_rib()") + + router_list = tgen.routers() + additional_nexthops_in_required_nhs = [] + list1 = [] + list2 = [] + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + command = "show bgp" + + # Static routes + sleep(2) + logger.info("Checking router {} BGP RIB:".format(dut)) + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + + for static_route in static_routes: + found_routes = [] + missing_routes = [] + st_found = False + nh_found = False + vrf = static_route.setdefault("vrf", None) + if vrf: + cmd = "{} vrf {} {}".format(command, vrf, addr_type) + + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + network = static_route["network"] + + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if not isinstance(next_hop, list): + next_hop = [next_hop] + list1 = next_hop + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json["routes"][st_rt][0][ + "nexthops" + ] + ] + list2 = found_hops + missing_list_of_nexthops = set(list2).difference(list1) + additional_nexthops_in_required_nhs = set( + list1 + ).difference(list2) + + if list2: + if additional_nexthops_in_required_nhs: + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + if aspath: + found_paths = rib_routes_json["routes"][st_rt][0][ + "path" + ] + if aspath == found_paths: + aspath_found = True + logger.info( + "Found AS path {} for route" + " {} in RIB of router " + "{}\n".format(aspath, st_rt, dut) + ) + else: + errormsg = ( + "AS Path {} is missing for route" + "for route {} in RIB of router {}\n".format( + aspath, st_rt, dut + ) + ) + return errormsg + + else: + missing_routes.append(st_rt) + + if nh_found: + logger.info( + "Found next_hop {} for all bgp" + " routes in RIB of" + " router {}\n".format(next_hop, router) + ) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in RIB of router {}, " + "routes: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, " + "found routes are: {} \n".format(dut, found_routes) + ) + continue + + if "bgp" not in input_dict[routerInput]: + continue + + # Advertise networks + bgp_data_list = input_dict[routerInput]["bgp"] + + if type(bgp_data_list) is not list: + bgp_data_list = [bgp_data_list] + + for bgp_data in bgp_data_list: + vrf_id = bgp_data.setdefault("vrf", None) + if vrf_id: + cmd = "{} vrf {} {}".format(command, vrf_id, addr_type) + else: + cmd = "{} {}".format(command, addr_type) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) == False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"] + advertise_network = bgp_net_advertise.setdefault( + "advertise_networks", [] + ) + + for advertise_network_dict in advertise_network: + found_routes = [] + missing_routes = [] + found = False + + network = advertise_network_dict["network"] + + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_network) + + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json["routes"]: + found = True + found_routes.append(st_rt) + else: + found = False + missing_routes.append(st_rt) + + if len(missing_routes) > 0: + errormsg = ( + "Missing route in BGP RIB of router {}," + " are: {}\n".format(dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified routes in router {} BGP RIB, found " + "routes are: {}\n".format(dut, found_routes) + ) - logger.info( - "Exiting lib API: verify_best_path_as_per_admin_distance()") + logger.debug("Exiting lib API: verify_bgp_rib()") return True |
