diff options
Diffstat (limited to 'tests/topotests/lib/bgp.py')
| -rw-r--r-- | tests/topotests/lib/bgp.py | 737 |
1 files changed, 395 insertions, 342 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 15f970ba75..d183fb9f60 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -27,13 +27,16 @@ from lib import topotest from lib.topolog import logger # Import common_config to use commomnly used APIs -from lib.common_config import (create_common_configuration, - InvalidCLIError, - load_config_to_router, - check_address_types, - generate_ips, - find_interface_with_greater_ip, - run_frr_cmd, retry) +from lib.common_config import ( + create_common_configuration, + InvalidCLIError, + load_config_to_router, + check_address_types, + generate_ips, + find_interface_with_greater_ip, + run_frr_cmd, + retry, +) BGP_CONVERGENCE_TIMEOUT = 10 @@ -126,24 +129,31 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False): bgp_addr_data = bgp_data.setdefault("address_family", {}) if not bgp_addr_data: - logger.debug("Router %s: 'address_family' not present in " - "input_dict for BGP", router) + logger.debug( + "Router %s: 'address_family' not present in " "input_dict for BGP", + router, + ) else: ipv4_data = bgp_addr_data.setdefault("ipv4", {}) ipv6_data = bgp_addr_data.setdefault("ipv6", {}) - neigh_unicast = True if ipv4_data.setdefault("unicast", {}) \ - or ipv6_data.setdefault("unicast", {}) else False + neigh_unicast = ( + True + if ipv4_data.setdefault("unicast", {}) + or ipv6_data.setdefault("unicast", {}) + else False + ) if neigh_unicast: data_all_bgp = __create_bgp_unicast_neighbor( - tgen, topo, input_dict, router, - config_data=data_all_bgp) + tgen, topo, input_dict, router, config_data=data_all_bgp + ) try: - result = create_common_configuration(tgen, router, data_all_bgp, - "bgp", build) + result = create_common_configuration( + tgen, router, data_all_bgp, "bgp", build + ) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() @@ -182,8 +192,9 @@ def __create_bgp_global(tgen, input_dict, router, build=False): config_data = [] if "local_as" not in bgp_data and build: - logger.error("Router %s: 'local_as' not present in input_dict" - "for BGP", router) + logger.error( + "Router %s: 'local_as' not present in input_dict" "for BGP", router + ) return False local_as = bgp_data.setdefault("local_as", "") @@ -199,14 +210,12 @@ def __create_bgp_global(tgen, input_dict, router, build=False): if del_router_id: config_data.append("no bgp router-id") if router_id: - config_data.append("bgp router-id {}".format( - router_id)) + config_data.append("bgp router-id {}".format(router_id)) return config_data -def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, - config_data=None): +def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data=None): """ Helper API to create configuration for address-family unicast @@ -235,11 +244,8 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, addr_data = addr_dict["unicast"] if addr_data: - config_data.append("address-family {} unicast".format( - addr_type - )) - advertise_network = addr_data.setdefault("advertise_networks", - []) + config_data.append("address-family {} unicast".format(addr_type)) + advertise_network = addr_data.setdefault("advertise_networks", []) for advertise_network_dict in advertise_network: network = advertise_network_dict["network"] if type(network) is not list: @@ -250,12 +256,10 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, else: no_of_network = 1 - del_action = advertise_network_dict.setdefault("delete", - False) + del_action = advertise_network_dict.setdefault("delete", False) # Generating IPs for verification - prefix = str( - ipaddr.IPNetwork(unicode(network[0])).prefixlen) + prefix = str(ipaddr.IPNetwork(unicode(network[0])).prefixlen) network_list = generate_ips(network, no_of_network) for ip in network_list: ip = str(ipaddr.IPNetwork(unicode(ip)).network) @@ -271,20 +275,17 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, ibgp = max_paths.setdefault("ibgp", None) ebgp = max_paths.setdefault("ebgp", None) if ibgp: - config_data.append("maximum-paths ibgp {}".format( - ibgp - )) + config_data.append("maximum-paths ibgp {}".format(ibgp)) if ebgp: - config_data.append("maximum-paths {}".format( - ebgp - )) + config_data.append("maximum-paths {}".format(ebgp)) aggregate_addresses = addr_data.setdefault("aggregate_address", []) for aggregate_address in aggregate_addresses: network = aggregate_address.setdefault("network", None) if not network: - logger.debug("Router %s: 'network' not present in " - "input_dict for BGP", router) + logger.debug( + "Router %s: 'network' not present in " "input_dict for BGP", router + ) else: cmd = "aggregate-address {}".format(network) @@ -305,13 +306,12 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, if redistribute_data: for redistribute in redistribute_data: if "redist_type" not in redistribute: - logger.error("Router %s: 'redist_type' not present in " - "input_dict", router) + logger.error( + "Router %s: 'redist_type' not present in " "input_dict", router + ) else: - cmd = "redistribute {}".format( - redistribute["redist_type"]) - redist_attr = redistribute.setdefault("attribute", - None) + cmd = "redistribute {}".format(redistribute["redist_type"]) + redist_attr = redistribute.setdefault("attribute", None) if redist_attr: cmd = "{} {}".format(cmd, redist_attr) del_action = redistribute.setdefault("delete", False) @@ -320,8 +320,9 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data.append(cmd) if "neighbor" in addr_data: - neigh_data = __create_bgp_neighbor(topo, input_dict, - router, addr_type, add_neigh) + neigh_data = __create_bgp_neighbor( + topo, input_dict, router, addr_type, add_neigh + ) config_data.extend(neigh_data) for addr_type, addr_dict in bgp_data.iteritems(): @@ -331,11 +332,11 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, addr_data = addr_dict["unicast"] if "neighbor" in addr_data: neigh_addr_data = __create_bgp_unicast_address_family( - topo, input_dict, router, addr_type, add_neigh) + topo, input_dict, router, addr_type, add_neigh + ) config_data.extend(neigh_addr_data) - logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") return config_data @@ -365,12 +366,10 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): update_source = None if dest_link in nh_details["links"].keys(): - ip_addr = \ - nh_details["links"][dest_link][addr_type].split("/")[0] + ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0] # Loopback interface if "source_link" in peer and peer["source_link"] == "lo": - update_source = topo[router]["links"]["lo"][ - addr_type].split("/")[0] + update_source = topo[router]["links"]["lo"][addr_type].split("/")[0] neigh_cxt = "neighbor {}".format(ip_addr) @@ -380,41 +379,44 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): config_data.append("address-family ipv6 unicast") config_data.append("{} activate".format(neigh_cxt)) - disable_connected = peer.setdefault("disable_connected_check", - False) + disable_connected = peer.setdefault("disable_connected_check", False) keep_alive = peer.setdefault("keepalivetimer", 60) hold_down = peer.setdefault("holddowntimer", 180) password = peer.setdefault("password", None) max_hop_limit = peer.setdefault("ebgp_multihop", 1) if update_source: - config_data.append("{} update-source {}".format( - neigh_cxt, update_source)) + config_data.append( + "{} update-source {}".format(neigh_cxt, update_source) + ) if disable_connected: - config_data.append("{} disable-connected-check".format( - disable_connected)) + config_data.append( + "{} disable-connected-check".format(disable_connected) + ) if update_source: - config_data.append("{} update-source {}".format(neigh_cxt, - update_source)) + config_data.append( + "{} update-source {}".format(neigh_cxt, update_source) + ) if int(keep_alive) != 60 and int(hold_down) != 180: config_data.append( - "{} timers {} {}".format(neigh_cxt, keep_alive, - hold_down)) + "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down) + ) if password: - config_data.append( - "{} password {}".format(neigh_cxt, password)) + config_data.append("{} password {}".format(neigh_cxt, password)) if max_hop_limit > 1: - config_data.append("{} ebgp-multihop {}".format(neigh_cxt, - max_hop_limit)) + config_data.append( + "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit) + ) config_data.append("{} enforce-multihop".format(neigh_cxt)) logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") return config_data -def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, - add_neigh=True): +def __create_bgp_unicast_address_family( + topo, input_dict, router, addr_type, add_neigh=True +): """ API prints bgp global config to bgp_json file. @@ -440,31 +442,27 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, nh_details = topo[peer_name] # Loopback interface if "source_link" in peer and peer["source_link"] == "lo": - for destRouterLink, data in sorted(nh_details["links"]. - iteritems()): + for destRouterLink, data in sorted(nh_details["links"].iteritems()): if "type" in data and data["type"] == "loopback": if dest_link == destRouterLink: - ip_addr = \ - nh_details["links"][destRouterLink][ - addr_type].split("/")[0] + ip_addr = nh_details["links"][destRouterLink][ + addr_type + ].split("/")[0] # Physical interface else: if dest_link in nh_details["links"].keys(): - ip_addr = nh_details["links"][dest_link][ - addr_type].split("/")[0] + ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0] if addr_type == "ipv4" and bgp_data["ipv6"]: - deactivate = nh_details["links"][ - dest_link]["ipv6"].split("/")[0] + deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[ + 0 + ] neigh_cxt = "neighbor {}".format(ip_addr) - config_data.append("address-family {} unicast".format( - addr_type - )) + config_data.append("address-family {} unicast".format(addr_type)) if deactivate: - config_data.append( - "no neighbor {} activate".format(deactivate)) + config_data.append("no neighbor {} activate".format(deactivate)) next_hop_self = peer.setdefault("next_hop_self", None) send_community = peer.setdefault("send_community", None) @@ -481,8 +479,9 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, # no_send_community if no_send_community: - config_data.append("no {} send-community {}".format( - neigh_cxt, no_send_community)) + config_data.append( + "no {} send-community {}".format(neigh_cxt, no_send_community) + ) if prefix_lists: for prefix_list in prefix_lists: @@ -490,12 +489,13 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, direction = prefix_list.setdefault("direction", "in") del_action = prefix_list.setdefault("delete", False) if not name: - logger.info("Router %s: 'name' not present in " - "input_dict for BGP neighbor prefix lists", - router) + logger.info( + "Router %s: 'name' not present in " + "input_dict for BGP neighbor prefix lists", + router, + ) else: - cmd = "{} prefix-list {} {}".format(neigh_cxt, name, - direction) + cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) @@ -506,12 +506,13 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type, direction = route_map.setdefault("direction", "in") del_action = route_map.setdefault("delete", False) if not name: - logger.info("Router %s: 'name' not present in " - "input_dict for BGP neighbor route name", - router) + logger.info( + "Router %s: 'name' not present in " + "input_dict for BGP neighbor route name", + router, + ) else: - cmd = "{} route-map {} {}".format(neigh_cxt, name, - direction) + cmd = "{} route-map {} {}".format(neigh_cxt, name, direction) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) @@ -564,12 +565,10 @@ def verify_router_id(tgen, topo, input_dict): rnode = tgen.routers()[router] - del_router_id = input_dict[router]["bgp"].setdefault( - "del_router_id", False) + del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False) logger.info("Checking router %s router-id", router) - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) router_id_out = show_bgp_json["ipv4Unicast"]["routerId"] router_id_out = ipaddr.IPv4Address(unicode(router_id_out)) @@ -582,12 +581,12 @@ def verify_router_id(tgen, topo, input_dict): router_id = ipaddr.IPv4Address(unicode(router_id)) if router_id == router_id_out: - logger.info("Found expected router-id %s for router %s", - router_id, router) + logger.info("Found expected router-id %s for router %s", router_id, router) else: - errormsg = "Router-id for router:{} mismatch, expected:" \ - " {} but found:{}".format(router, router_id, - router_id_out) + errormsg = ( + "Router-id for router:{} mismatch, expected:" + " {} but found:{}".format(router, router_id, router_id_out) + ) return errormsg logger.debug("Exiting lib API: verify_router_id()") @@ -618,8 +617,7 @@ def verify_bgp_convergence(tgen, topo): logger.debug("Entering lib API: verify_bgp_convergence()") for router, rnode in tgen.routers().iteritems(): logger.info("Verifying BGP Convergence on router %s", router) - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -647,15 +645,12 @@ def verify_bgp_convergence(tgen, topo): for dest_link in peer_data["dest_link"].keys(): data = topo["routers"][bgp_neighbor]["links"] if dest_link in data: - neighbor_ip = \ - data[dest_link][addr_type].split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] if addr_type == "ipv4": - ipv4_data = show_bgp_json["ipv4Unicast"][ - "peers"] + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] nh_state = ipv4_data[neighbor_ip]["state"] else: - ipv6_data = show_bgp_json["ipv6Unicast"][ - "peers"] + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] if nh_state == "Established": @@ -663,8 +658,7 @@ def verify_bgp_convergence(tgen, topo): if no_of_peer == total_peer: logger.info("BGP is Converged for router %s", router) else: - errormsg = "BGP is not converged for router {}".format( - router) + errormsg = "BGP is not converged for router {}".format(router) return errormsg logger.debug("Exiting API: verify_bgp_convergence()") @@ -707,16 +701,9 @@ def modify_as_number(tgen, topo, input_dict): for router in input_dict.keys(): # Remove bgp configuration - router_dict.update({ - router: { - "bgp": { - "delete": True - } - } - }) + router_dict.update({router: {"bgp": {"delete": True}}}) - new_topo[router]["bgp"]["local_as"] = \ - input_dict[router]["bgp"]["local_as"] + new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"] logger.info("Removing bgp configuration") create_router_bgp(tgen, topo, router_dict) @@ -777,8 +764,9 @@ def verify_as_numbers(tgen, topo, input_dict): logger.info("Verifying AS numbers for dut %s:", router) - show_ip_bgp_neighbor_json = run_frr_cmd(rnode, - "show ip bgp neighbor json", isjson=True) + show_ip_bgp_neighbor_json = run_frr_cmd( + rnode, "show ip bgp neighbor json", isjson=True + ) local_as = input_dict[router]["bgp"]["local_as"] bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] @@ -786,8 +774,7 @@ def verify_as_numbers(tgen, topo, input_dict): if not check_address_types(addr_type): continue - bgp_neighbors = bgp_addr_type[addr_type]["unicast"][ - "neighbor"] + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"] @@ -796,32 +783,42 @@ def verify_as_numbers(tgen, topo, input_dict): data = topo["routers"][bgp_neighbor]["links"] if dest_link in data: - neighbor_ip = data[dest_link][addr_type]. \ - split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] neigh_data = show_ip_bgp_neighbor_json[neighbor_ip] # Verify Local AS for router if neigh_data["localAs"] != local_as: - errormsg = "Failed: Verify local_as for dut {}," \ - " found: {} but expected: {}".format( - router, neigh_data["localAs"], - local_as) + errormsg = ( + "Failed: Verify local_as for dut {}," + " found: {} but expected: {}".format( + router, neigh_data["localAs"], local_as + ) + ) return errormsg else: - logger.info("Verified local_as for dut %s, found" - " expected: %s", router, local_as) + logger.info( + "Verified local_as for dut %s, found" " expected: %s", + router, + local_as, + ) # Verify Remote AS for neighbor if neigh_data["remoteAs"] != remote_as: - errormsg = "Failed: Verify remote_as for dut " \ - "{}'s neighbor {}, found: {} but " \ - "expected: {}".format( - router, bgp_neighbor, - neigh_data["remoteAs"], remote_as) + errormsg = ( + "Failed: Verify remote_as for dut " + "{}'s neighbor {}, found: {} but " + "expected: {}".format( + router, bgp_neighbor, neigh_data["remoteAs"], remote_as + ) + ) return errormsg else: - logger.info("Verified remote_as for dut %s's " - "neighbor %s, found expected: %s", - router, bgp_neighbor, remote_as) + logger.info( + "Verified remote_as for dut %s's " + "neighbor %s, found expected: %s", + router, + bgp_neighbor, + remote_as, + ) logger.debug("Exiting lib API: verify_AS_numbers()") return True @@ -862,12 +859,14 @@ def clear_bgp_and_verify(tgen, topo, router): for retry in range(31): sleeptime = 3 # Waiting for BGP to converge - logger.info("Waiting for %s sec for BGP to converge on router" - " %s...", sleeptime, router) + logger.info( + "Waiting for %s sec for BGP to converge on router" " %s...", + sleeptime, + router, + ) sleep(sleeptime) - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -897,35 +896,37 @@ def clear_bgp_and_verify(tgen, topo, router): if dest_link in data: neighbor_ip = data[dest_link][addr_type].split("/")[0] if addr_type == "ipv4": - ipv4_data = show_bgp_json["ipv4Unicast"][ - "peers"] + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] nh_state = ipv4_data[neighbor_ip]["state"] # Peer up time dictionary - peer_uptime_before_clear_bgp[bgp_neighbor] = \ - ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] else: - ipv6_data = show_bgp_json["ipv6Unicast"][ - "peers"] + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] # Peer up time dictionary - peer_uptime_before_clear_bgp[bgp_neighbor] = \ - ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] if nh_state == "Established": no_of_peer += 1 if no_of_peer == total_peer: - logger.info("BGP is Converged for router %s before bgp" - " clear", router) + logger.info("BGP is Converged for router %s before bgp" " clear", router) break else: - logger.info("BGP is not yet Converged for router %s " - "before bgp clear", router) + logger.info( + "BGP is not yet Converged for router %s " "before bgp clear", router + ) else: - errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \ - " router {}".format(router) + errormsg = ( + "TIMEOUT!! BGP is not converged in 30 seconds for" + " router {}".format(router) + ) return errormsg logger.info(peer_uptime_before_clear_bgp) @@ -942,13 +943,14 @@ def clear_bgp_and_verify(tgen, topo, router): for retry in range(31): sleeptime = 3 # Waiting for BGP to converge - logger.info("Waiting for %s sec for BGP to converge on router" - " %s...", sleeptime, router) + logger.info( + "Waiting for %s sec for BGP to converge on router" " %s...", + sleeptime, + router, + ) sleep(sleeptime) - - show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -975,44 +977,46 @@ def clear_bgp_and_verify(tgen, topo, router): data = topo["routers"][bgp_neighbor]["links"] if dest_link in data: - neighbor_ip = data[dest_link][addr_type].\ - split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] if addr_type == "ipv4": - ipv4_data = show_bgp_json["ipv4Unicast"][ - "peers"] + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] nh_state = ipv4_data[neighbor_ip]["state"] - peer_uptime_after_clear_bgp[bgp_neighbor] = \ - ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] else: - ipv6_data = show_bgp_json["ipv6Unicast"][ - "peers"] + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] # Peer up time dictionary - peer_uptime_after_clear_bgp[bgp_neighbor] = \ - ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"] + peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[ + neighbor_ip + ]["peerUptimeEstablishedEpoch"] if nh_state == "Established": no_of_peer += 1 if no_of_peer == total_peer: - logger.info("BGP is Converged for router %s after bgp clear", - router) + logger.info("BGP is Converged for router %s after bgp clear", router) break else: - logger.info("BGP is not yet Converged for router %s after" - " bgp clear", router) + logger.info( + "BGP is not yet Converged for router %s after" " bgp clear", router + ) else: - errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \ - " router {}".format(router) + errormsg = ( + "TIMEOUT!! BGP is not converged in 30 seconds for" + " router {}".format(router) + ) return errormsg logger.info(peer_uptime_after_clear_bgp) # Comparing peerUptimeEstablishedEpoch dictionaries if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp: - logger.info("BGP neighborship is reset after clear BGP on router %s", - router) + logger.info("BGP neighborship is reset after clear BGP on router %s", router) else: - errormsg = "BGP neighborship is not reset after clear bgp on router" \ - " {}".format(router) + errormsg = ( + "BGP neighborship is not reset after clear bgp on router" + " {}".format(router) + ) return errormsg logger.debug("Exiting lib API: clear_bgp_and_verify()") @@ -1060,11 +1064,11 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): rnode = router_list[router] - logger.info("Verifying bgp timers functionality, DUT is %s:", - router) + logger.info("Verifying bgp timers functionality, DUT is %s:", router) - show_ip_bgp_neighbor_json = \ - run_frr_cmd(rnode, "show ip bgp neighbor json", isjson=True) + show_ip_bgp_neighbor_json = run_frr_cmd( + rnode, "show ip bgp neighbor json", isjson=True + ) bgp_addr_type = input_dict[router]["bgp"]["address_family"] @@ -1072,8 +1076,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): if not check_address_types(addr_type): continue - bgp_neighbors = bgp_addr_type[addr_type]["unicast"][ - "neighbor"] + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): for dest_link, peer_dict in peer_data["dest_link"].iteritems(): data = topo["routers"][bgp_neighbor]["links"] @@ -1082,32 +1085,41 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): holddowntimer = peer_dict["holddowntimer"] if dest_link in data: - neighbor_ip = data[dest_link][addr_type]. \ - split("/")[0] + neighbor_ip = data[dest_link][addr_type].split("/")[0] neighbor_intf = data[dest_link]["interface"] # Verify HoldDownTimer for neighbor - bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[ - neighbor_ip]["bgpTimerHoldTimeMsecs"] + bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][ + "bgpTimerHoldTimeMsecs" + ] if bgpHoldTimeMsecs != holddowntimer * 1000: - errormsg = "Verifying holddowntimer for bgp " \ - "neighbor {} under dut {}, found: {} " \ - "but expected: {}".format( - neighbor_ip, router, - bgpHoldTimeMsecs, - holddowntimer * 1000) + errormsg = ( + "Verifying holddowntimer for bgp " + "neighbor {} under dut {}, found: {} " + "but expected: {}".format( + neighbor_ip, + router, + bgpHoldTimeMsecs, + holddowntimer * 1000, + ) + ) return errormsg # Verify KeepAliveTimer for neighbor - bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[ - neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"] + bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][ + "bgpTimerKeepAliveIntervalMsecs" + ] if bgpKeepAliveTimeMsecs != keepalivetimer * 1000: - errormsg = "Verifying keepalivetimer for bgp " \ - "neighbor {} under dut {}, found: {} " \ - "but expected: {}".format( - neighbor_ip, router, - bgpKeepAliveTimeMsecs, - keepalivetimer * 1000) + errormsg = ( + "Verifying keepalivetimer for bgp " + "neighbor {} under dut {}, found: {} " + "but expected: {}".format( + neighbor_ip, + router, + bgpKeepAliveTimeMsecs, + keepalivetimer * 1000, + ) + ) return errormsg #################### @@ -1120,40 +1132,50 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): # Wait till keep alive time logger.info("=" * 20) logger.info("Scenario 1:") - logger.info("Shutdown and bring up peer interface: %s " - "in keep alive time : %s sec and verify " - " BGP neighborship is intact in %s sec ", - neighbor_intf, keepalivetimer, - (holddowntimer - keepalivetimer)) + logger.info( + "Shutdown and bring up peer interface: %s " + "in keep alive time : %s sec and verify " + " BGP neighborship is intact in %s sec ", + neighbor_intf, + keepalivetimer, + (holddowntimer - keepalivetimer), + ) logger.info("=" * 20) logger.info("Waiting for %s sec..", keepalivetimer) sleep(keepalivetimer) # Shutting down peer ineterface - logger.info("Shutting down interface %s on router %s", - neighbor_intf, bgp_neighbor) + logger.info( + "Shutting down interface %s on router %s", + neighbor_intf, + bgp_neighbor, + ) topotest.interface_set_status( - router_list[bgp_neighbor], neighbor_intf, - ifaceaction=False) + router_list[bgp_neighbor], neighbor_intf, ifaceaction=False + ) # Bringing up peer interface sleep(5) - logger.info("Bringing up interface %s on router %s..", - neighbor_intf, bgp_neighbor) + logger.info( + "Bringing up interface %s on router %s..", + neighbor_intf, + bgp_neighbor, + ) topotest.interface_set_status( - router_list[bgp_neighbor], neighbor_intf, - ifaceaction=True) + router_list[bgp_neighbor], neighbor_intf, ifaceaction=True + ) # Verifying BGP neighborship is intact in # (holddown - keepalive) time - for timer in range(keepalivetimer, holddowntimer, - int(holddowntimer / 3)): + for timer in range( + keepalivetimer, holddowntimer, int(holddowntimer / 3) + ): logger.info("Waiting for %s sec..", keepalivetimer) sleep(keepalivetimer) sleep(2) - show_bgp_json = \ - run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd( + rnode, "show bgp summary json", isjson=True + ) if addr_type == "ipv4": ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] @@ -1162,17 +1184,22 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] nh_state = ipv6_data[neighbor_ip]["state"] - if timer == \ - (holddowntimer - keepalivetimer): + if timer == (holddowntimer - keepalivetimer): if nh_state != "Established": - errormsg = "BGP neighborship has not gone " \ - "down in {} sec for neighbor {}" \ - .format(timer, bgp_neighbor) + errormsg = ( + "BGP neighborship has not gone " + "down in {} sec for neighbor {}".format( + timer, bgp_neighbor + ) + ) return errormsg else: - logger.info("BGP neighborship is intact in %s" - " sec for neighbor %s", - timer, bgp_neighbor) + logger.info( + "BGP neighborship is intact in %s" + " sec for neighbor %s", + timer, + bgp_neighbor, + ) #################### # Shutting down peer interface and verifying that BGP @@ -1180,27 +1207,36 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): #################### logger.info("=" * 20) logger.info("Scenario 2:") - logger.info("Shutdown peer interface: %s and verify BGP" - " neighborship has gone down in hold down " - "time %s sec", neighbor_intf, holddowntimer) + logger.info( + "Shutdown peer interface: %s and verify BGP" + " neighborship has gone down in hold down " + "time %s sec", + neighbor_intf, + holddowntimer, + ) logger.info("=" * 20) - logger.info("Shutting down interface %s on router %s..", - neighbor_intf, bgp_neighbor) - topotest.interface_set_status(router_list[bgp_neighbor], - neighbor_intf, - ifaceaction=False) + logger.info( + "Shutting down interface %s on router %s..", + neighbor_intf, + bgp_neighbor, + ) + topotest.interface_set_status( + router_list[bgp_neighbor], neighbor_intf, ifaceaction=False + ) # Verifying BGP neighborship is going down in holddown time - for timer in range(keepalivetimer, - (holddowntimer + keepalivetimer), - int(holddowntimer / 3)): + for timer in range( + keepalivetimer, + (holddowntimer + keepalivetimer), + int(holddowntimer / 3), + ): logger.info("Waiting for %s sec..", keepalivetimer) sleep(keepalivetimer) sleep(2) - show_bgp_json = \ - run_frr_cmd(rnode, "show bgp summary json", - isjson=True) + show_bgp_json = run_frr_cmd( + rnode, "show bgp summary json", isjson=True + ) if addr_type == "ipv4": ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] @@ -1211,22 +1247,29 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): if timer == holddowntimer: if nh_state == "Established": - errormsg = "BGP neighborship has not gone " \ - "down in {} sec for neighbor {}" \ - .format(timer, bgp_neighbor) + errormsg = ( + "BGP neighborship has not gone " + "down in {} sec for neighbor {}".format( + timer, bgp_neighbor + ) + ) return errormsg else: - logger.info("BGP neighborship has gone down in" - " %s sec for neighbor %s", - timer, bgp_neighbor) + logger.info( + "BGP neighborship has gone down in" + " %s sec for neighbor %s", + timer, + bgp_neighbor, + ) logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()") return True @retry(attempts=3, wait=4, return_is_str=True) -def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, - input_dict, seq_id=None): +def verify_bgp_attributes( + tgen, addr_type, dut, static_routes, rmap_name, input_dict, seq_id=None +): """ API will verify BGP attributes set by Route-map for given prefix and DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command @@ -1288,7 +1331,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, if router != dut: continue - logger.info('Verifying BGP set attributes for dut {}:'.format(router)) + logger.info("Verifying BGP set attributes for dut {}:".format(router)) for static_route in static_routes: cmd = "show bgp {} {} json".format(addr_type, static_route) @@ -1297,8 +1340,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, dict_to_test = [] tmp_list = [] for rmap_router in input_dict.keys(): - for rmap, values in input_dict[rmap_router][ - "route_maps"].items(): + for rmap, values in input_dict[rmap_router]["route_maps"].items(): if rmap == rmap_name: dict_to_test = values for rmap_dict in values: @@ -1307,8 +1349,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, seq_id = [seq_id] if "seq_id" in rmap_dict: - rmap_seq_id = \ - rmap_dict["seq_id"] + rmap_seq_id = rmap_dict["seq_id"] for _seq_id in seq_id: if _seq_id == rmap_seq_id: tmp_list.append(rmap_dict) @@ -1318,55 +1359,56 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, for rmap_dict in dict_to_test: if "set" in rmap_dict: for criteria in rmap_dict["set"].keys(): - if criteria not in show_bgp_json[ - "paths"][0]: - errormsg = ("BGP attribute: {}" - " is not found in" - " cli: {} output " - "in router {}". - format(criteria, - cmd, - router)) + if criteria not in show_bgp_json["paths"][0]: + errormsg = ( + "BGP attribute: {}" + " is not found in" + " cli: {} output " + "in router {}".format(criteria, cmd, router) + ) return errormsg - if rmap_dict["set"][criteria] == \ - show_bgp_json["paths"][0][ - criteria]: - logger.info("Verifying BGP " - "attribute {} for" - " route: {} in " - "router: {}, found" - " expected value:" - " {}". - format(criteria, - static_route, - dut, - rmap_dict[ - "set"][ - criteria])) + if ( + rmap_dict["set"][criteria] + == show_bgp_json["paths"][0][criteria] + ): + logger.info( + "Verifying BGP " + "attribute {} for" + " route: {} in " + "router: {}, found" + " expected value:" + " {}".format( + criteria, + static_route, + dut, + rmap_dict["set"][criteria], + ) + ) else: - errormsg = \ - ("Failed: Verifying BGP " - "attribute {} for route:" - " {} in router: {}, " - " expected value: {} but" - " found: {}". - format(criteria, - static_route, - dut, - rmap_dict["set"] - [criteria], - show_bgp_json[ - 'paths'][ - 0][criteria])) + errormsg = ( + "Failed: Verifying BGP " + "attribute {} for route:" + " {} in router: {}, " + " expected value: {} but" + " found: {}".format( + criteria, + static_route, + dut, + rmap_dict["set"][criteria], + show_bgp_json["paths"][0][criteria], + ) + ) return errormsg logger.debug("Exiting lib API: verify_bgp_attributes()") return True + @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) -def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, - attribute): +def verify_best_path_as_per_bgp_attribute( + tgen, addr_type, router, input_dict, attribute +): """ API is to verify best path according to BGP attributes for given routes. "show bgp ipv4/6 json" command will be run and verify best path according @@ -1445,38 +1487,36 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, # AS_PATH attribute if attribute == "path": # Find next_hop for the route have minimum as_path - _next_hop = min(attribute_dict, key=lambda x: len(set( - attribute_dict[x]))) + _next_hop = min( + attribute_dict, key=lambda x: len(set(attribute_dict[x])) + ) compare = "SHORTEST" # LOCAL_PREF attribute elif attribute == "locPrf": # Find next_hop for the route have highest local preference - _next_hop = max(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "HIGHEST" # WEIGHT attribute elif attribute == "weight": # Find next_hop for the route have highest weight - _next_hop = max(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "HIGHEST" # ORIGIN attribute elif attribute == "origin": # Find next_hop for the route have IGP as origin, - # - rule is IGP>EGP>INCOMPLETE - _next_hop = [key for (key, value) in - attribute_dict.iteritems() - if value == "IGP"][0] + _next_hop = [ + key for (key, value) in attribute_dict.iteritems() if value == "IGP" + ][0] compare = "" # MED attribute elif attribute == "metric": # Find next_hop for the route have LOWEST MED - _next_hop = min(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "LOWEST" # Show ip route @@ -1489,8 +1529,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, # Verifying output dictionary rib_routes_json is not empty if not bool(rib_routes_json): - errormsg = "No route found in RIB of router {}..". \ - format(router) + errormsg = "No route found in RIB of router {}..".format(router) return errormsg st_found = False @@ -1499,31 +1538,41 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, if route in rib_routes_json: st_found = True # Verify next_hop in rib_routes_json - if rib_routes_json[route][0]["nexthops"][0]["ip"] in \ - attribute_dict: + if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict: nh_found = True else: - errormsg = "Incorrect Nexthop for BGP route {} in " \ - "RIB of router {}, Expected: {}, Found:" \ - " {}\n".format(route, router, - rib_routes_json[route][0][ - "nexthops"][0]["ip"], - _next_hop) + errormsg = ( + "Incorrect Nexthop for BGP route {} in " + "RIB of router {}, Expected: {}, Found:" + " {}\n".format( + route, + router, + rib_routes_json[route][0]["nexthops"][0]["ip"], + _next_hop, + ) + ) return errormsg if st_found and nh_found: logger.info( "Best path for prefix: %s with next_hop: %s is " "installed according to %s %s: (%s) in RIB of " - "router %s", route, _next_hop, compare, - attribute, attribute_dict[_next_hop], router) + "router %s", + route, + _next_hop, + compare, + attribute, + attribute_dict[_next_hop], + router, + ) logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()") return True -def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, - attribute): +def verify_best_path_as_per_admin_distance( + tgen, addr_type, router, input_dict, attribute +): """ API is to verify best path according to admin distance for given route. "show ip/ipv6 route json" command will be run and verify @@ -1574,7 +1623,8 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, for routes_from_router in input_dict.keys(): sh_ip_route_json = router_list[routes_from_router].vtysh_cmd( - command, isjson=True) + command, isjson=True + ) networks = input_dict[routes_from_router]["static_routes"] for network in networks: route = network["network"] @@ -1590,8 +1640,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, attribute_dict[next_hop_ip] = route_attribute["distance"] # Find next_hop for the route have LOWEST Admin Distance - _next_hop = min(attribute_dict, key=(lambda k: - attribute_dict[k])) + _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k])) compare = "LOWEST" # Show ip route @@ -1608,21 +1657,25 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, if route in rib_routes_json: st_found = True # Verify next_hop in rib_routes_json - if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ - _next_hop: + if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop: nh_found = True else: - errormsg = ("Nexthop {} is Missing for BGP route {}" - " in RIB of router {}\n".format(_next_hop, - route, router)) + errormsg = ( + "Nexthop {} is Missing for BGP route {}" + " in RIB of router {}\n".format(_next_hop, route, router) + ) return errormsg if st_found and nh_found: - logger.info("Best path for prefix: %s is installed according" - " to %s %s: (%s) in RIB of router %s", route, - compare, attribute, - attribute_dict[_next_hop], router) - - logger.info( - "Exiting lib API: verify_best_path_as_per_admin_distance()") + logger.info( + "Best path for prefix: %s is installed according" + " to %s %s: (%s) in RIB of router %s", + route, + compare, + attribute, + attribute_dict[_next_hop], + router, + ) + + logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()") return True |
