diff options
Diffstat (limited to 'tests/topotests/lib/bgp.py')
| -rw-r--r-- | tests/topotests/lib/bgp.py | 193 |
1 files changed, 104 insertions, 89 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index ac834bcf4d..d05332388e 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -1227,15 +1227,14 @@ def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) tgen = get_topogen() for router, rnode in tgen.routers().items(): - if 'bgp' not in topo['routers'][router]: + if "bgp" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying BGP Convergence on router %s:", router) - show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", - isjson=True) + show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not if not bool(show_bgp_json): errormsg = "BGP is not running" @@ -1272,39 +1271,43 @@ def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True): data = topo["routers"][bgp_neighbor]["links"] for dest_link in dest_link_dict.keys(): if dest_link in data: - peer_details = \ - peer_data[_addr_type][dest_link] + peer_details = peer_data[_addr_type][dest_link] - neighbor_ip = \ - data[dest_link][_addr_type].split( - "/")[0] + neighbor_ip = data[dest_link][_addr_type].split("/")[0] nh_state = None - if "ipv4Unicast" in show_bgp_json[vrf] or \ - "ipv6Unicast" in show_bgp_json[vrf]: - errormsg = ("[DUT: %s] VRF: %s, " - "ipv4Unicast/ipv6Unicast" - " address-family present" - " under l2vpn" % (router, - vrf)) + if ( + "ipv4Unicast" in show_bgp_json[vrf] + or "ipv6Unicast" in show_bgp_json[vrf] + ): + errormsg = ( + "[DUT: %s] VRF: %s, " + "ipv4Unicast/ipv6Unicast" + " address-family present" + " under l2vpn" % (router, vrf) + ) return errormsg - l2VpnEvpn_data = \ - show_bgp_json[vrf]["l2VpnEvpn"][ - "peers"] - nh_state = \ - l2VpnEvpn_data[neighbor_ip]["state"] + l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][ + "peers" + ] + nh_state = l2VpnEvpn_data[neighbor_ip]["state"] if nh_state == "Established": no_of_evpn_peer += 1 if no_of_evpn_peer == total_evpn_peer: - logger.info("[DUT: %s] VRF: %s, BGP is Converged for " - "epvn peers", router, vrf) + logger.info( + "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers", + router, + vrf, + ) result = True else: - errormsg = ("[DUT: %s] VRF: %s, BGP is not converged " - "for evpn peers" % (router, vrf)) + errormsg = ( + "[DUT: %s] VRF: %s, BGP is not converged " + "for evpn peers" % (router, vrf) + ) return errormsg else: total_peer = 0 @@ -1312,76 +1315,72 @@ def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True): if not check_address_types(addr_type): continue - bgp_neighbors = \ - bgp_addr_type[addr_type]["unicast"]["neighbor"] + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor in bgp_neighbors: - total_peer += \ - len(bgp_neighbors[bgp_neighbor]["dest_link"]) + total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) no_of_peer = 0 for addr_type in bgp_addr_type.keys(): if not check_address_types(addr_type): continue - bgp_neighbors = \ - bgp_addr_type[addr_type]["unicast"]["neighbor"] + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor, peer_data in bgp_neighbors.items(): - for dest_link in peer_data["dest_link"].\ - keys(): - data = \ - topo["routers"][bgp_neighbor]["links"] - if dest_link in data: - peer_details = \ - peer_data['dest_link'][dest_link] - # for link local neighbors - if "neighbor_type" in peer_details and \ - peer_details["neighbor_type"] == \ - 'link-local': - intf = topo["routers"][bgp_neighbor][ - "links"][dest_link]["interface"] - neighbor_ip = get_frr_ipv6_linklocal( - tgen, bgp_neighbor, intf) - elif "source_link" in peer_details: - neighbor_ip = \ - topo["routers"][bgp_neighbor][ - "links"][peer_details[ - 'source_link']][ - addr_type].\ - split("/")[0] - elif "neighbor_type" in peer_details and \ - peer_details["neighbor_type"] == \ - 'unnumbered': - neighbor_ip = \ - data[dest_link]["peer-interface"] - else: - neighbor_ip = \ - data[dest_link][addr_type].split( - "/")[0] - nh_state = None - neighbor_ip = neighbor_ip.lower() - if addr_type == "ipv4": - ipv4_data = show_bgp_json[vrf][ - "ipv4Unicast"]["peers"] - nh_state = \ - ipv4_data[neighbor_ip]["state"] - else: - ipv6_data = show_bgp_json[vrf][ - "ipv6Unicast"]["peers"] - if neighbor_ip in ipv6_data: - nh_state = \ - ipv6_data[neighbor_ip]["state"] + for dest_link in peer_data["dest_link"].keys(): + data = topo["routers"][bgp_neighbor]["links"] + if dest_link in data: + peer_details = peer_data["dest_link"][dest_link] + # for link local neighbors + if ( + "neighbor_type" in peer_details + and peer_details["neighbor_type"] == "link-local" + ): + intf = topo["routers"][bgp_neighbor]["links"][ + dest_link + ]["interface"] + neighbor_ip = get_frr_ipv6_linklocal( + tgen, bgp_neighbor, intf + ) + elif "source_link" in peer_details: + neighbor_ip = topo["routers"][bgp_neighbor][ + "links" + ][peer_details["source_link"]][addr_type].split( + "/" + )[ + 0 + ] + elif ( + "neighbor_type" in peer_details + and peer_details["neighbor_type"] == "unnumbered" + ): + neighbor_ip = data[dest_link]["peer-interface"] + else: + neighbor_ip = data[dest_link][addr_type].split("/")[ + 0 + ] + nh_state = None + neighbor_ip = neighbor_ip.lower() + if addr_type == "ipv4": + ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][ + "peers" + ] + nh_state = ipv4_data[neighbor_ip]["state"] + else: + ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][ + "peers" + ] + if neighbor_ip in ipv6_data: + nh_state = ipv6_data[neighbor_ip]["state"] - if nh_state == "Established": - no_of_peer += 1 + if nh_state == "Established": + no_of_peer += 1 if no_of_peer == total_peer and no_of_peer > 0: - logger.info("[DUT: %s] VRF: %s, BGP is Converged", - router, vrf) + logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf) result = True else: - errormsg = ("[DUT: %s] VRF: %s, BGP is not converged" - % (router, vrf)) + errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1390,7 +1389,14 @@ def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True): @retry(retry_timeout=16) def verify_bgp_community( - tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False, expected=True + tgen, + addr_type, + router, + network, + input_dict=None, + vrf=None, + bestpath=False, + expected=True, ): """ API to veiryf BGP large community is attached in route for any given @@ -2216,7 +2222,7 @@ def verify_bgp_attributes( input_dict=None, seq_id=None, nexthop=None, - expected=True + expected=True, ): """ API will verify BGP attributes set by Route-map for given prefix and @@ -2668,7 +2674,14 @@ def verify_best_path_as_per_admin_distance( @retry(retry_timeout=10, initial_wait=2) def verify_bgp_rib( - tgen, addr_type, dut, input_dict, next_hop=None, aspath=None, multi_nh=None, expected=True + tgen, + addr_type, + dut, + input_dict, + next_hop=None, + aspath=None, + multi_nh=None, + expected=True, ): """ This API is to verify whether bgp rib has any @@ -2970,7 +2983,9 @@ def verify_bgp_rib( @retry(retry_timeout=10) -def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer, expected=True): +def verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut, peer, expected=True +): """ This API is to verify verify_graceful_restart configuration of DUT and cross verify the same from the peer bgp routerrouter. @@ -3772,7 +3787,9 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer) @retry(retry_timeout=8) -def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, peer, expected=True): +def verify_gr_address_family( + tgen, topo, addr_type, addr_family, dut, peer, expected=True +): """ This API is to verify gr_address_family in the BGP gr capability advertised by the neighbor router @@ -3830,9 +3847,7 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, peer, expe show_bgp_graceful_json = run_frr_cmd( rnode, - "show bgp {} neighbor {} graceful-restart json".format( - addr_type, neighbor_ip - ), + "show bgp {} neighbor {} graceful-restart json".format(addr_type, neighbor_ip), isjson=True, ) @@ -3880,7 +3895,7 @@ def verify_attributes_for_evpn_routes( ipLen=None, rd_peer=None, rt_peer=None, - expected=True + expected=True, ): """ API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1" |
