diff options
Diffstat (limited to 'tests/topotests/lib/bgp.py')
| -rw-r--r-- | tests/topotests/lib/bgp.py | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index a4b65cb987..34b48eed12 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -1256,3 +1256,267 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): logger.info("Exiting lib API: verify_bgp_timers_and_functionality()") return True + +def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, + attribute): + """ + API is to verify best path according to BGP attributes for given routes. + "show bgp ipv4/6 json" command will be run and verify best path according + to shortest as-path, highest local-preference and med, lowest weight and + route origin IGP>EGP>INCOMPLETE. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `tgen` : topogen object + * `attribute` : calculate best path using this attribute + * `input_dict`: defines different routes to calculate for which route + best path is selected + + Usage + ----- + # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from + router r7 to router r1(DUT) as per shortest as-path attribute + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + attribute = "localpref" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \ + input_dict, attribute) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()") + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + # TODO get addr_type from address + # Verifying show bgp json + command = "show bgp {} json".format(addr_type) + + sleep(2) + logger.info("Verifying router %s RIB for best path:", router) + sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True) + + for route_val in input_dict.values(): + net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"] + networks = net_data["advertise_networks"] + for network in networks: + route = network["network"] + + route_attributes = sh_ip_bgp_json["routes"][route] + _next_hop = None + compare = None + attribute_dict = {} + for route_attribute in route_attributes: + next_hops = route_attribute["nexthops"] + for next_hop in next_hops: + next_hop_ip = next_hop["ip"] + attribute_dict[next_hop_ip] = route_attribute[attribute] + + # AS_PATH attribute + if attribute == "aspath": + # Find next_hop for the route have minimum as_path + _next_hop = min(attribute_dict, key=lambda x: len(set( + attribute_dict[x]))) + compare = "SHORTEST" + + # LOCAL_PREF attribute + elif attribute == "localpref": + # Find next_hop for the route have highest local preference + _next_hop = max(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "HIGHEST" + + # WEIGHT attribute + elif attribute == "weight": + # Find next_hop for the route have highest weight + _next_hop = max(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "HIGHEST" + + # ORIGIN attribute + elif attribute == "origin": + # Find next_hop for the route have IGP as origin, - + # - rule is IGP>EGP>INCOMPLETE + _next_hop = [key for (key, value) in + attribute_dict.iteritems() + if value == "IGP"][0] + compare = "" + + # MED attribute + elif attribute == "med": + # Find next_hop for the route have LOWEST MED + _next_hop = min(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "LOWEST" + + # Show ip route + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if not bool(rib_routes_json): + errormsg = "No route found in RIB of router {}..". \ + format(router) + return errormsg + + st_found = False + nh_found = False + # Find best is installed in RIB + if route in rib_routes_json: + st_found = True + # Verify next_hop in rib_routes_json + if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ + _next_hop: + nh_found = True + else: + errormsg = "Incorrect Nexthop for BGP route {} in " \ + "RIB of router {}, Expected: {}, Found:" \ + " {}\n".format(route, router, + rib_routes_json[route][0][ + "nexthops"][0]["ip"], + _next_hop) + return errormsg + + if st_found and nh_found: + logger.info( + "Best path for prefix: %s with next_hop: %s is " + "installed according to %s %s: (%s) in RIB of " + "router %s", route, _next_hop, compare, + attribute, attribute_dict[_next_hop], router) + + logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()") + return True + + +def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, + attribute): + """ + API is to verify best path according to admin distance for given + route. "show ip/ipv6 route json" command will be run and verify + best path accoring to shortest admin distanc. + + Parameters + ---------- + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `tgen` : topogen object + * `attribute` : calculate best path using admin distance + * `input_dict`: defines different routes with different admin distance + to calculate for which route best path is selected + Usage + ----- + # To verify best path for route 200.50.2.0/32 from router r2 to + router r1(DUT) as per shortest admin distance which is 60. + input_dict = { + "r2": { + "static_routes": [{"network": "200.50.2.0/32", \ + "admin_distance": 80, "next_hop": "10.0.0.14"}, + {"network": "200.50.2.0/32", \ + "admin_distance": 60, "next_hop": "10.0.0.18"}] + }} + attribute = "localpref" + result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \ + input_dict, attribute): + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_best_path_as_per_admin_distance()") + router_list = tgen.routers() + if router not in router_list: + return False + + rnode = tgen.routers()[router] + + sleep(2) + logger.info("Verifying router %s RIB for best path:", router) + + # Show ip route cmd + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + + for routes_from_router in input_dict.keys(): + sh_ip_route_json = router_list[routes_from_router].vtysh_cmd( + command, isjson=True) + networks = input_dict[routes_from_router]["static_routes"] + for network in networks: + route = network["network"] + + route_attributes = sh_ip_route_json[route] + _next_hop = None + compare = None + attribute_dict = {} + for route_attribute in route_attributes: + next_hops = route_attribute["nexthops"] + for next_hop in next_hops: + next_hop_ip = next_hop["ip"] + attribute_dict[next_hop_ip] = route_attribute["distance"] + + # Find next_hop for the route have LOWEST Admin Distance + _next_hop = min(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "LOWEST" + + # Show ip route + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if not bool(rib_routes_json): + errormsg = "No route found in RIB of router {}..".format(router) + return errormsg + + st_found = False + nh_found = False + # Find best is installed in RIB + if route in rib_routes_json: + st_found = True + # Verify next_hop in rib_routes_json + if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ + _next_hop: + nh_found = True + else: + errormsg = ("Nexthop {} is Missing for BGP route {}" + " in RIB of router {}\n".format(_next_hop, + route, router)) + return errormsg + + if st_found and nh_found: + logger.info("Best path for prefix: %s is installed according" + " to %s %s: (%s) in RIB of router %s", route, + compare, attribute, + attribute_dict[_next_hop], router) + + logger.info( + "Exiting lib API: verify_best_path_as_per_admin_distance()") + return True |
