summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py264
1 files changed, 264 insertions, 0 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index a4b65cb987..34b48eed12 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -1256,3 +1256,267 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
return True
+
+def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
+ attribute):
+ """
+ API is to verify best path according to BGP attributes for given routes.
+ "show bgp ipv4/6 json" command will be run and verify best path according
+ to shortest as-path, highest local-preference and med, lowest weight and
+ route origin IGP>EGP>INCOMPLETE.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `tgen` : topogen object
+ * `attribute` : calculate best path using this attribute
+ * `input_dict`: defines different routes to calculate for which route
+ best path is selected
+
+ Usage
+ -----
+ # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
+ router r7 to router r1(DUT) as per shortest as-path attribute
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ attribute = "localpref"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
+ input_dict, attribute)
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ # TODO get addr_type from address
+ # Verifying show bgp json
+ command = "show bgp {} json".format(addr_type)
+
+ sleep(2)
+ logger.info("Verifying router %s RIB for best path:", router)
+ sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True)
+
+ for route_val in input_dict.values():
+ net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"]
+ networks = net_data["advertise_networks"]
+ for network in networks:
+ route = network["network"]
+
+ route_attributes = sh_ip_bgp_json["routes"][route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute[attribute]
+
+ # AS_PATH attribute
+ if attribute == "aspath":
+ # Find next_hop for the route have minimum as_path
+ _next_hop = min(attribute_dict, key=lambda x: len(set(
+ attribute_dict[x])))
+ compare = "SHORTEST"
+
+ # LOCAL_PREF attribute
+ elif attribute == "localpref":
+ # Find next_hop for the route have highest local preference
+ _next_hop = max(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "HIGHEST"
+
+ # WEIGHT attribute
+ elif attribute == "weight":
+ # Find next_hop for the route have highest weight
+ _next_hop = max(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "HIGHEST"
+
+ # ORIGIN attribute
+ elif attribute == "origin":
+ # Find next_hop for the route have IGP as origin, -
+ # - rule is IGP>EGP>INCOMPLETE
+ _next_hop = [key for (key, value) in
+ attribute_dict.iteritems()
+ if value == "IGP"][0]
+ compare = ""
+
+ # MED attribute
+ elif attribute == "med":
+ # Find next_hop for the route have LOWEST MED
+ _next_hop = min(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "LOWEST"
+
+ # Show ip route
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..". \
+ format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
+ _next_hop:
+ nh_found = True
+ else:
+ errormsg = "Incorrect Nexthop for BGP route {} in " \
+ "RIB of router {}, Expected: {}, Found:" \
+ " {}\n".format(route, router,
+ rib_routes_json[route][0][
+ "nexthops"][0]["ip"],
+ _next_hop)
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info(
+ "Best path for prefix: %s with next_hop: %s is "
+ "installed according to %s %s: (%s) in RIB of "
+ "router %s", route, _next_hop, compare,
+ attribute, attribute_dict[_next_hop], router)
+
+ logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()")
+ return True
+
+
+def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
+ attribute):
+ """
+ API is to verify best path according to admin distance for given
+ route. "show ip/ipv6 route json" command will be run and verify
+ best path accoring to shortest admin distanc.
+
+ Parameters
+ ----------
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `tgen` : topogen object
+ * `attribute` : calculate best path using admin distance
+ * `input_dict`: defines different routes with different admin distance
+ to calculate for which route best path is selected
+ Usage
+ -----
+ # To verify best path for route 200.50.2.0/32 from router r2 to
+ router r1(DUT) as per shortest admin distance which is 60.
+ input_dict = {
+ "r2": {
+ "static_routes": [{"network": "200.50.2.0/32", \
+ "admin_distance": 80, "next_hop": "10.0.0.14"},
+ {"network": "200.50.2.0/32", \
+ "admin_distance": 60, "next_hop": "10.0.0.18"}]
+ }}
+ attribute = "localpref"
+ result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
+ input_dict, attribute):
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_best_path_as_per_admin_distance()")
+ router_list = tgen.routers()
+ if router not in router_list:
+ return False
+
+ rnode = tgen.routers()[router]
+
+ sleep(2)
+ logger.info("Verifying router %s RIB for best path:", router)
+
+ # Show ip route cmd
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+
+ for routes_from_router in input_dict.keys():
+ sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
+ command, isjson=True)
+ networks = input_dict[routes_from_router]["static_routes"]
+ for network in networks:
+ route = network["network"]
+
+ route_attributes = sh_ip_route_json[route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute["distance"]
+
+ # Find next_hop for the route have LOWEST Admin Distance
+ _next_hop = min(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "LOWEST"
+
+ # Show ip route
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..".format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
+ _next_hop:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for BGP route {}"
+ " in RIB of router {}\n".format(_next_hop,
+ route, router))
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info("Best path for prefix: %s is installed according"
+ " to %s %s: (%s) in RIB of router %s", route,
+ compare, attribute,
+ attribute_dict[_next_hop], router)
+
+ logger.info(
+ "Exiting lib API: verify_best_path_as_per_admin_distance()")
+ return True