summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py662
1 files changed, 477 insertions, 185 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 2dd90e9a86..69c807f300 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -75,8 +75,12 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
"address_family": {
"ipv4": {
"unicast": {
- "redistribute": [
- {"redist_type": "static"},
+ "redistribute": [{
+ "redist_type": "static",
+ "attribute": {
+ "metric" : 123
+ }
+ },
{"redist_type": "connected"}
],
"advertise_networks": [
@@ -143,50 +147,55 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
logger.debug("Router %s: 'bgp' not present in input_dict", router)
continue
- data_all_bgp = __create_bgp_global(tgen, input_dict, router, build)
- if data_all_bgp:
- bgp_data = input_dict[router]["bgp"]
+ bgp_data_list = input_dict[router]["bgp"]
- bgp_addr_data = bgp_data.setdefault("address_family", {})
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
- if not bgp_addr_data:
- logger.debug(
- "Router %s: 'address_family' not present in " "input_dict for BGP",
- router,
- )
- else:
+ for bgp_data in bgp_data_list:
+ data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
+ if data_all_bgp:
+ bgp_addr_data = bgp_data.setdefault("address_family", {})
- ipv4_data = bgp_addr_data.setdefault("ipv4", {})
- ipv6_data = bgp_addr_data.setdefault("ipv6", {})
+ if not bgp_addr_data:
+ logger.debug(
+ "Router %s: 'address_family' not present in "
+ "input_dict for BGP",
+ router,
+ )
+ else:
- neigh_unicast = (
- True
- if ipv4_data.setdefault("unicast", {})
- or ipv6_data.setdefault("unicast", {})
- else False
- )
+ ipv4_data = bgp_addr_data.setdefault("ipv4", {})
+ ipv6_data = bgp_addr_data.setdefault("ipv6", {})
- if neigh_unicast:
- data_all_bgp = __create_bgp_unicast_neighbor(
- tgen,
- topo,
- input_dict,
- router,
- afi_test,
- config_data=data_all_bgp,
+ neigh_unicast = (
+ True
+ if ipv4_data.setdefault("unicast", {})
+ or ipv6_data.setdefault("unicast", {})
+ else False
)
- try:
- result = create_common_configuration(
- tgen, router, data_all_bgp, "bgp", build, load_config
- )
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if neigh_unicast:
+ data_all_bgp = __create_bgp_unicast_neighbor(
+ tgen,
+ topo,
+ bgp_data,
+ router,
+ afi_test,
+ config_data=data_all_bgp,
+ )
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ try:
+ result = create_common_configuration(
+ tgen, router, data_all_bgp, "bgp", build, load_config
+ )
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_router_bgp()")
return result
@@ -206,19 +215,16 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
True or False
"""
+ result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- bgp_data = input_dict[router]["bgp"]
+ bgp_data = input_dict
del_bgp_action = bgp_data.setdefault("delete", False)
- if del_bgp_action:
- config_data = ["no router bgp"]
-
- return config_data
config_data = []
if "local_as" not in bgp_data and build:
- logger.error(
+ logger.debug(
"Router %s: 'local_as' not present in input_dict" "for BGP", router
)
return False
@@ -229,6 +235,12 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
if vrf_id:
cmd = "{} vrf {}".format(cmd, vrf_id)
+ if del_bgp_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ return config_data
+
config_data.append(cmd)
config_data.append("no bgp ebgp-requires-policy")
@@ -325,12 +337,15 @@ def __create_bgp_unicast_neighbor(
* `build` : Only for initial setup phase this is set as True.
"""
+ result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
add_neigh = True
+ bgp_data = input_dict
if "router bgp" in config_data:
add_neigh = False
- bgp_data = input_dict[router]["bgp"]["address_family"]
+
+ bgp_data = input_dict["address_family"]
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict:
@@ -403,14 +418,19 @@ def __create_bgp_unicast_neighbor(
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
- logger.error(
+ logger.debug(
"Router %s: 'redist_type' not present in " "input_dict", router
)
else:
cmd = "redistribute {}".format(redistribute["redist_type"])
redist_attr = redistribute.setdefault("attribute", None)
if redist_attr:
- cmd = "{} {}".format(cmd, redist_attr)
+ if isinstance(redist_attr, dict):
+ for key, value in redist_attr.items():
+ cmd = "{} {} {}".format(cmd, key, value)
+ else:
+ cmd = "{} {}".format(cmd, redist_attr)
+
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
@@ -453,13 +473,18 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
config_data = []
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- bgp_data = input_dict[router]["bgp"]["address_family"]
+ bgp_data = input_dict["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
for name, peer_dict in neigh_data.iteritems():
for dest_link, peer in peer_dict["dest_link"].iteritems():
nh_details = topo[name]
- remote_as = nh_details["bgp"]["local_as"]
+
+ if "vrfs" in topo[router]:
+ remote_as = nh_details["bgp"][0]["local_as"]
+ else:
+ remote_as = nh_details["bgp"]["local_as"]
+
update_source = None
if dest_link in nh_details["links"].keys():
@@ -549,7 +574,7 @@ def __create_bgp_unicast_address_family(
config_data = []
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- bgp_data = input_dict[router]["bgp"]["address_family"]
+ bgp_data = input_dict["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
for peer_name, peer_dict in deepcopy(neigh_data).iteritems():
@@ -605,8 +630,12 @@ def __create_bgp_unicast_address_family(
allowas_in = peer.setdefault("allowas-in", None)
# next-hop-self
- if next_hop_self:
- config_data.append("{} next-hop-self".format(neigh_cxt))
+ if next_hop_self is not None:
+ if next_hop_self is True:
+ config_data.append("{} next-hop-self".format(neigh_cxt))
+ else:
+ config_data.append("no {} next-hop-self".format(neigh_cxt))
+
# send_community
if send_community:
config_data.append("{} send-community".format(neigh_cxt))
@@ -840,77 +869,288 @@ def verify_router_id(tgen, topo, input_dict):
@retry(attempts=20, wait=2, return_is_str=True)
-def verify_bgp_convergence(tgen, topo):
+def verify_bgp_convergence(tgen, topo, dut=None):
"""
API will verify if BGP is converged with in the given time frame.
Running "show bgp summary json" command and verify bgp neighbor
state is established,
+
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
- * `addr_type`: ip_type, ipv4/ipv6
+ * `dut`: device under test
+
Usage
-----
# To veriry is BGP is converged for all the routers used in
topology
- results = verify_bgp_convergence(tgen, topo, "ipv4")
+ results = verify_bgp_convergence(tgen, topo, dut="r1")
+
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
if "bgp" not in topo["routers"][router]:
continue
- logger.info("Verifying BGP Convergence on router %s", router)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying BGP Convergence on router %s:", router)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
# To find neighbor ip type
- bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
- for addr_type in bgp_addr_type.keys():
- if not check_address_types(addr_type):
- continue
- total_peer = 0
+ bgp_data_list = topo["routers"][router]["bgp"]
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
- for bgp_neighbor in bgp_neighbors:
- total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+ for bgp_data in bgp_data_list:
+ if "vrf" in bgp_data:
+ vrf = bgp_data["vrf"]
+ if vrf is None:
+ vrf = "default"
+ else:
+ vrf = "default"
- for addr_type in bgp_addr_type.keys():
- if not check_address_types(addr_type):
- continue
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ # To find neighbor ip type
+ bgp_addr_type = bgp_data["address_family"]
+ if "l2vpn" in bgp_addr_type:
+ total_evpn_peer = 0
- no_of_peer = 0
- for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
- for dest_link in peer_data["dest_link"].keys():
- data = topo["routers"][bgp_neighbor]["links"]
- if dest_link in data:
- neighbor_ip = data[dest_link][addr_type].split("/")[0]
- if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
- nh_state = ipv4_data[neighbor_ip]["state"]
- else:
- ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
- nh_state = ipv6_data[neighbor_ip]["state"]
+ if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
+ continue
- if nh_state == "Established":
- no_of_peer += 1
- if no_of_peer == total_peer:
- logger.info("BGP is Converged for router %s", router)
+ bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
+ total_evpn_peer += len(bgp_neighbors)
+
+ no_of_evpn_peer = 0
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ for _addr_type, dest_link_dict in peer_data.items():
+ data = topo["routers"][bgp_neighbor]["links"]
+ for dest_link in dest_link_dict.keys():
+ if dest_link in data:
+ peer_details = peer_data[_addr_type][dest_link]
+
+ neighbor_ip = data[dest_link][_addr_type].split("/")[0]
+ nh_state = None
+
+ if (
+ "ipv4Unicast" in show_bgp_json[vrf]
+ or "ipv6Unicast" in show_bgp_json[vrf]
+ ):
+ errormsg = (
+ "[DUT: %s] VRF: %s, "
+ "ipv4Unicast/ipv6Unicast"
+ " address-family present"
+ " under l2vpn" % (router, vrf)
+ )
+ return errormsg
+
+ l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
+ "peers"
+ ]
+ nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
+
+ if nh_state == "Established":
+ no_of_evpn_peer += 1
+
+ if no_of_evpn_peer == total_evpn_peer:
+ logger.info(
+ "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
+ router,
+ vrf,
+ )
+ else:
+ errormsg = (
+ "[DUT: %s] VRF: %s, BGP is not converged "
+ "for evpn peers" % (router, vrf)
+ )
+ return errormsg
+ else:
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+ total_peer = 0
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ no_of_peer = 0
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ for dest_link in peer_data["dest_link"].keys():
+ data = topo["routers"][bgp_neighbor]["links"]
+ if dest_link in data:
+ peer_details = peer_data["dest_link"][dest_link]
+ # for link local neighbors
+ if (
+ "neighbor_type" in peer_details
+ and peer_details["neighbor_type"] == "link-local"
+ ):
+ neighbor_ip = get_ipv6_linklocal_address(
+ topo["routers"], bgp_neighbor, dest_link
+ )
+ elif "source_link" in peer_details:
+ neighbor_ip = topo["routers"][bgp_neighbor][
+ "links"
+ ][peer_details["source_link"]][addr_type].split(
+ "/"
+ )[
+ 0
+ ]
+ elif (
+ "neighbor_type" in peer_details
+ and peer_details["neighbor_type"] == "unnumbered"
+ ):
+ neighbor_ip = data[dest_link]["peer-interface"]
+ else:
+ neighbor_ip = data[dest_link][addr_type].split("/")[
+ 0
+ ]
+ nh_state = None
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
+ "peers"
+ ]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
+ "peers"
+ ]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
+ else:
+ errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
+ return errormsg
+
+ logger.debug("Exiting API: verify_bgp_convergence()")
+ return True
+
+
+@retry(attempts=3, wait=4, return_is_str=True)
+def verify_bgp_community(
+ tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False
+):
+ """
+ API to veiryf BGP large community is attached in route for any given
+ DUT by running "show bgp ipv4/6 {route address} json" command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `network`: network for which set criteria needs to be verified
+ * `input_dict`: having details like - for which router, community and
+ values needs to be verified
+ * `vrf`: VRF name
+ * `bestpath`: To check best path cli
+
+ Usage
+ -----
+ networks = ["200.50.2.0/32"]
+ input_dict = {
+ "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
+ }
+ result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_community()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ logger.info(
+ "Verifying BGP community attributes on dut %s: for %s " "network %s",
+ router,
+ addr_type,
+ network,
+ )
+
+ command = "show bgp"
+
+ sleep(5)
+ for net in network:
+ if vrf:
+ cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
+ elif bestpath:
+ cmd = "{} {} {} bestpath json".format(command, addr_type, net)
else:
- errormsg = "BGP is not converged for router {}".format(router)
+ cmd = "{} {} {} json".format(command, addr_type, net)
+
+ show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
+ if "paths" not in show_bgp_json:
+ return "Prefix {} not found in BGP table of router: {}".format(net, router)
+
+ as_paths = show_bgp_json["paths"]
+ found = False
+ for i in range(len(as_paths)):
+ if (
+ "largeCommunity" in show_bgp_json["paths"][i]
+ or "community" in show_bgp_json["paths"][i]
+ ):
+ found = True
+ logger.info(
+ "Large Community attribute is found for route:" " %s in router: %s",
+ net,
+ router,
+ )
+ if input_dict is not None:
+ for criteria, comm_val in input_dict.items():
+ show_val = show_bgp_json["paths"][i][criteria]["string"]
+ if comm_val == show_val:
+ logger.info(
+ "Verifying BGP %s for prefix: %s"
+ " in router: %s, found expected"
+ " value: %s",
+ criteria,
+ net,
+ router,
+ comm_val,
+ )
+ else:
+ errormsg = (
+ "Failed: Verifying BGP attribute"
+ " {} for route: {} in router: {}"
+ ", expected value: {} but found"
+ ": {}".format(criteria, net, router, comm_val, show_val)
+ )
+ return errormsg
+
+ if not found:
+ errormsg = (
+ "Large Community attribute is not found for route: "
+ "{} in router: {} ".format(net, router)
+ )
return errormsg
- logger.debug("Exiting API: verify_bgp_convergence()")
+ logger.debug("Exiting lib API: verify_bgp_community()")
return True
@@ -1090,6 +1330,7 @@ def clear_bgp(tgen, addr_type, router, vrf=None):
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
if router not in tgen.routers():
return False
@@ -1102,9 +1343,21 @@ def clear_bgp(tgen, addr_type, router, vrf=None):
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
if addr_type == "ipv4":
- run_frr_cmd(rnode, "clear ip bgp *")
+ if vrf:
+ for _vrf in vrf:
+ run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
+ else:
+ run_frr_cmd(rnode, "clear ip bgp *")
elif addr_type == "ipv6":
- run_frr_cmd(rnode, "clear bgp ipv6 *")
+ if vrf:
+ for _vrf in vrf:
+ run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
+ else:
+ run_frr_cmd(rnode, "clear bgp ipv6 *")
+ else:
+ run_frr_cmd(rnode, "clear bgp *")
+
+ sleep(5)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1698,7 +1951,6 @@ def verify_best_path_as_per_bgp_attribute(
"show bgp ipv4/6 json" command will be run and verify best path according
to shortest as-path, highest local-preference and med, lowest weight and
route origin IGP>EGP>INCOMPLETE.
-
Parameters
----------
* `tgen` : topogen object
@@ -1707,7 +1959,6 @@ def verify_best_path_as_per_bgp_attribute(
* `attribute` : calculate best path using this attribute
* `input_dict`: defines different routes to calculate for which route
best path is selected
-
Usage
-----
# To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
@@ -1741,114 +1992,155 @@ def verify_best_path_as_per_bgp_attribute(
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
- command = "show bgp {} json".format(addr_type)
+ # Verifying show bgp json
+ command = "show bgp"
- sleep(5)
+ sleep(2)
logger.info("Verifying router %s RIB for best path:", router)
- sh_ip_bgp_json = run_frr_cmd(rnode, command, isjson=True)
+ static_route = False
+ advertise_network = False
for route_val in input_dict.values():
- net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
- networks = net_data["advertise_networks"]
- for network in networks:
- route = network["network"]
+ if "static_routes" in route_val:
+ static_route = True
+ networks = route_val["static_routes"]
+ else:
+ advertise_network = True
+ net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
+ networks = net_data["advertise_networks"]
- route_attributes = sh_ip_bgp_json["routes"][route]
- _next_hop = None
- compare = None
- attribute_dict = {}
- for route_attribute in route_attributes:
- next_hops = route_attribute["nexthops"]
- for next_hop in next_hops:
- next_hop_ip = next_hop["ip"]
- attribute_dict[next_hop_ip] = route_attribute[attribute]
+ for network in networks:
+ _network = network["network"]
+ no_of_ip = network.setdefault("no_of_ip", 1)
+ vrf = network.setdefault("vrf", None)
- # AS_PATH attribute
- if attribute == "path":
- # Find next_hop for the route have minimum as_path
- _next_hop = min(
- attribute_dict, key=lambda x: len(set(attribute_dict[x]))
- )
- compare = "SHORTEST"
-
- # LOCAL_PREF attribute
- elif attribute == "locPrf":
- # Find next_hop for the route have highest local preference
- _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
- compare = "HIGHEST"
-
- # WEIGHT attribute
- elif attribute == "weight":
- # Find next_hop for the route have highest weight
- _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
- compare = "HIGHEST"
-
- # ORIGIN attribute
- elif attribute == "origin":
- # Find next_hop for the route have IGP as origin, -
- # - rule is IGP>EGP>INCOMPLETE
- _next_hop = [
- key for (key, value) in attribute_dict.iteritems() if value == "IGP"
- ][0]
- compare = ""
-
- # MED attribute
- elif attribute == "metric":
- # Find next_hop for the route have LOWEST MED
- _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
- compare = "LOWEST"
-
- # Show ip route
- if addr_type == "ipv4":
- command = "show ip route json"
+ if vrf:
+ cmd = "{} vrf {}".format(command, vrf)
else:
- command = "show ipv6 route json"
+ cmd = command
+
+ cmd = "{} {}".format(cmd, addr_type)
+ cmd = "{} json".format(cmd)
+ sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ routes = generate_ips(_network, no_of_ip)
+ for route in routes:
+ route = str(ipaddr.IPNetwork(unicode(route)))
+
+ if route in sh_ip_bgp_json["routes"]:
+ route_attributes = sh_ip_bgp_json["routes"][route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute[attribute]
+
+ # AS_PATH attribute
+ if attribute == "path":
+ # Find next_hop for the route have minimum as_path
+ _next_hop = min(
+ attribute_dict, key=lambda x: len(set(attribute_dict[x]))
+ )
+ compare = "SHORTEST"
- rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
+ # LOCAL_PREF attribute
+ elif attribute == "locPrf":
+ # Find next_hop for the route have highest local preference
+ _next_hop = max(
+ attribute_dict, key=(lambda k: attribute_dict[k])
+ )
+ compare = "HIGHEST"
- # Verifying output dictionary rib_routes_json is not empty
- if not bool(rib_routes_json):
- errormsg = "No route found in RIB of router {}..".format(router)
- return errormsg
+ # WEIGHT attribute
+ elif attribute == "weight":
+ # Find next_hop for the route have highest weight
+ _next_hop = max(
+ attribute_dict, key=(lambda k: attribute_dict[k])
+ )
+ compare = "HIGHEST"
+
+ # ORIGIN attribute
+ elif attribute == "origin":
+ # Find next_hop for the route have IGP as origin, -
+ # - rule is IGP>EGP>INCOMPLETE
+ _next_hop = [
+ key
+ for (key, value) in attribute_dict.iteritems()
+ if value == "IGP"
+ ][0]
+ compare = ""
+
+ # MED attribute
+ elif attribute == "metric":
+ # Find next_hop for the route have LOWEST MED
+ _next_hop = min(
+ attribute_dict, key=(lambda k: attribute_dict[k])
+ )
+ compare = "LOWEST"
- st_found = False
- nh_found = False
- # Find best is installed in RIB
- if route in rib_routes_json:
- st_found = True
- # Verify next_hop in rib_routes_json
- if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict:
- nh_found = True
- else:
- errormsg = (
- "Incorrect Nexthop for BGP route {} in "
- "RIB of router {}, Expected: {}, Found:"
- " {}\n".format(
+ # Show ip route
+ if addr_type == "ipv4":
+ command_1 = "show ip route"
+ else:
+ command_1 = "show ipv6 route"
+
+ if vrf:
+ cmd = "{} vrf {} json".format(command_1, vrf)
+ else:
+ cmd = "{} json".format(command_1)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..".format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if (
+ rib_routes_json[route][0]["nexthops"][0]["ip"]
+ in attribute_dict
+ ):
+ nh_found = True
+ else:
+ errormsg = (
+ "Incorrect Nexthop for BGP route {} in "
+ "RIB of router {}, Expected: {}, Found:"
+ " {}\n".format(
+ route,
+ router,
+ rib_routes_json[route][0]["nexthops"][0]["ip"],
+ _next_hop,
+ )
+ )
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info(
+ "Best path for prefix: %s with next_hop: %s is "
+ "installed according to %s %s: (%s) in RIB of "
+ "router %s",
route,
- router,
- rib_routes_json[route][0]["nexthops"][0]["ip"],
_next_hop,
+ compare,
+ attribute,
+ attribute_dict[_next_hop],
+ router,
)
- )
- return errormsg
-
- if st_found and nh_found:
- logger.info(
- "Best path for prefix: %s with next_hop: %s is "
- "installed according to %s %s: (%s) in RIB of "
- "router %s",
- route,
- _next_hop,
- compare,
- attribute,
- attribute_dict[_next_hop],
- router,
- )
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1965,7 +2257,7 @@ def verify_best_path_as_per_admin_distance(
return True
-@retry(attempts=10, wait=2, return_is_str=True, initial_wait=2)
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
"""
This API is to verify whether bgp rib has any
@@ -1995,7 +2287,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
errormsg(str) or True
"""
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Entering lib API: verify_bgp_rib()")
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
@@ -2210,7 +2502,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
"routes are: {}\n".format(dut, found_routes)
)
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Exiting lib API: verify_bgp_rib()")
return True