summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py1279
1 files changed, 929 insertions, 350 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 997b72d691..cafba60abf 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -27,13 +27,17 @@ from lib import topotest
from lib.topolog import logger
# Import common_config to use commomnly used APIs
-from lib.common_config import (create_common_configuration,
- InvalidCLIError,
- load_config_to_router,
- check_address_types,
- generate_ips,
- find_interface_with_greater_ip,
- run_frr_cmd, retry)
+from lib.common_config import (
+ create_common_configuration,
+ InvalidCLIError,
+ load_config_to_router,
+ check_address_types,
+ generate_ips,
+ validate_ip_address,
+ find_interface_with_greater_ip,
+ run_frr_cmd,
+ retry,
+)
BGP_CONVERGENCE_TIMEOUT = 10
@@ -79,6 +83,9 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False):
"holddowntimer": 180,
"dest_link": {
"r4": {
+ "allowas-in": {
+ "number_occurences":2
+ },
"prefix_lists": [
{
"name": "pf_list_1",
@@ -126,24 +133,31 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False):
bgp_addr_data = bgp_data.setdefault("address_family", {})
if not bgp_addr_data:
- logger.debug("Router %s: 'address_family' not present in "
- "input_dict for BGP", router)
+ logger.debug(
+ "Router %s: 'address_family' not present in " "input_dict for BGP",
+ router,
+ )
else:
ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {})
- neigh_unicast = True if ipv4_data.setdefault("unicast", {}) \
- or ipv6_data.setdefault("unicast", {}) else False
+ neigh_unicast = (
+ True
+ if ipv4_data.setdefault("unicast", {})
+ or ipv6_data.setdefault("unicast", {})
+ else False
+ )
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
- tgen, topo, input_dict, router,
- config_data=data_all_bgp)
+ tgen, topo, input_dict, router, config_data=data_all_bgp
+ )
try:
- result = create_common_configuration(tgen, router, data_all_bgp,
- "bgp", build)
+ result = create_common_configuration(
+ tgen, router, data_all_bgp, "bgp", build
+ )
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
@@ -182,8 +196,9 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
config_data = []
if "local_as" not in bgp_data and build:
- logger.error("Router %s: 'local_as' not present in input_dict"
- "for BGP", router)
+ logger.error(
+ "Router %s: 'local_as' not present in input_dict" "for BGP", router
+ )
return False
local_as = bgp_data.setdefault("local_as", "")
@@ -194,19 +209,20 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
config_data.append(cmd)
+ # Skip RFC8212 in topotests
+ config_data.append("no bgp ebgp-requires-policy")
+
router_id = bgp_data.setdefault("router_id", None)
del_router_id = bgp_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no bgp router-id")
if router_id:
- config_data.append("bgp router-id {}".format(
- router_id))
+ config_data.append("bgp router-id {}".format(router_id))
return config_data
-def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
- config_data=None):
+def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data=None):
"""
Helper API to create configuration for address-family unicast
@@ -235,11 +251,8 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
addr_data = addr_dict["unicast"]
if addr_data:
- config_data.append("address-family {} unicast".format(
- addr_type
- ))
- advertise_network = addr_data.setdefault("advertise_networks",
- [])
+ config_data.append("address-family {} unicast".format(addr_type))
+ advertise_network = addr_data.setdefault("advertise_networks", [])
for advertise_network_dict in advertise_network:
network = advertise_network_dict["network"]
if type(network) is not list:
@@ -250,12 +263,10 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
else:
no_of_network = 1
- del_action = advertise_network_dict.setdefault("delete",
- False)
+ del_action = advertise_network_dict.setdefault("delete", False)
# Generating IPs for verification
- prefix = str(
- ipaddr.IPNetwork(unicode(network[0])).prefixlen)
+ prefix = str(ipaddr.IPNetwork(unicode(network[0])).prefixlen)
network_list = generate_ips(network, no_of_network)
for ip in network_list:
ip = str(ipaddr.IPNetwork(unicode(ip)).network)
@@ -271,20 +282,17 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
ibgp = max_paths.setdefault("ibgp", None)
ebgp = max_paths.setdefault("ebgp", None)
if ibgp:
- config_data.append("maximum-paths ibgp {}".format(
- ibgp
- ))
+ config_data.append("maximum-paths ibgp {}".format(ibgp))
if ebgp:
- config_data.append("maximum-paths {}".format(
- ebgp
- ))
+ config_data.append("maximum-paths {}".format(ebgp))
aggregate_addresses = addr_data.setdefault("aggregate_address", [])
for aggregate_address in aggregate_addresses:
network = aggregate_address.setdefault("network", None)
if not network:
- logger.debug("Router %s: 'network' not present in "
- "input_dict for BGP", router)
+ logger.debug(
+ "Router %s: 'network' not present in " "input_dict for BGP", router
+ )
else:
cmd = "aggregate-address {}".format(network)
@@ -305,13 +313,12 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
- logger.error("Router %s: 'redist_type' not present in "
- "input_dict", router)
+ logger.error(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
else:
- cmd = "redistribute {}".format(
- redistribute["redist_type"])
- redist_attr = redistribute.setdefault("attribute",
- None)
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ redist_attr = redistribute.setdefault("attribute", None)
if redist_attr:
cmd = "{} {}".format(cmd, redist_attr)
del_action = redistribute.setdefault("delete", False)
@@ -320,8 +327,9 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
config_data.append(cmd)
if "neighbor" in addr_data:
- neigh_data = __create_bgp_neighbor(topo, input_dict,
- router, addr_type, add_neigh)
+ neigh_data = __create_bgp_neighbor(
+ topo, input_dict, router, addr_type, add_neigh
+ )
config_data.extend(neigh_data)
for addr_type, addr_dict in bgp_data.iteritems():
@@ -331,11 +339,11 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
addr_data = addr_dict["unicast"]
if "neighbor" in addr_data:
neigh_addr_data = __create_bgp_unicast_address_family(
- topo, input_dict, router, addr_type, add_neigh)
+ topo, input_dict, router, addr_type, add_neigh
+ )
config_data.extend(neigh_addr_data)
-
logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
return config_data
@@ -365,12 +373,10 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
update_source = None
if dest_link in nh_details["links"].keys():
- ip_addr = \
- nh_details["links"][dest_link][addr_type].split("/")[0]
+ ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
# Loopback interface
if "source_link" in peer and peer["source_link"] == "lo":
- update_source = topo[router]["links"]["lo"][
- addr_type].split("/")[0]
+ update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
neigh_cxt = "neighbor {}".format(ip_addr)
@@ -380,41 +386,44 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
config_data.append("address-family ipv6 unicast")
config_data.append("{} activate".format(neigh_cxt))
- disable_connected = peer.setdefault("disable_connected_check",
- False)
+ disable_connected = peer.setdefault("disable_connected_check", False)
keep_alive = peer.setdefault("keepalivetimer", 60)
hold_down = peer.setdefault("holddowntimer", 180)
password = peer.setdefault("password", None)
max_hop_limit = peer.setdefault("ebgp_multihop", 1)
if update_source:
- config_data.append("{} update-source {}".format(
- neigh_cxt, update_source))
+ config_data.append(
+ "{} update-source {}".format(neigh_cxt, update_source)
+ )
if disable_connected:
- config_data.append("{} disable-connected-check".format(
- disable_connected))
+ config_data.append(
+ "{} disable-connected-check".format(disable_connected)
+ )
if update_source:
- config_data.append("{} update-source {}".format(neigh_cxt,
- update_source))
+ config_data.append(
+ "{} update-source {}".format(neigh_cxt, update_source)
+ )
if int(keep_alive) != 60 and int(hold_down) != 180:
config_data.append(
- "{} timers {} {}".format(neigh_cxt, keep_alive,
- hold_down))
+ "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
+ )
if password:
- config_data.append(
- "{} password {}".format(neigh_cxt, password))
+ config_data.append("{} password {}".format(neigh_cxt, password))
if max_hop_limit > 1:
- config_data.append("{} ebgp-multihop {}".format(neigh_cxt,
- max_hop_limit))
+ config_data.append(
+ "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
+ )
config_data.append("{} enforce-multihop".format(neigh_cxt))
logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
return config_data
-def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
- add_neigh=True):
+def __create_bgp_unicast_address_family(
+ topo, input_dict, router, addr_type, add_neigh=True
+):
"""
API prints bgp global config to bgp_json file.
@@ -440,37 +449,34 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
nh_details = topo[peer_name]
# Loopback interface
if "source_link" in peer and peer["source_link"] == "lo":
- for destRouterLink, data in sorted(nh_details["links"].
- iteritems()):
+ for destRouterLink, data in sorted(nh_details["links"].iteritems()):
if "type" in data and data["type"] == "loopback":
if dest_link == destRouterLink:
- ip_addr = \
- nh_details["links"][destRouterLink][
- addr_type].split("/")[0]
+ ip_addr = nh_details["links"][destRouterLink][
+ addr_type
+ ].split("/")[0]
# Physical interface
else:
if dest_link in nh_details["links"].keys():
- ip_addr = nh_details["links"][dest_link][
- addr_type].split("/")[0]
+ ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
if addr_type == "ipv4" and bgp_data["ipv6"]:
- deactivate = nh_details["links"][
- dest_link]["ipv6"].split("/")[0]
+ deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
+ 0
+ ]
neigh_cxt = "neighbor {}".format(ip_addr)
- config_data.append("address-family {} unicast".format(
- addr_type
- ))
+ config_data.append("address-family {} unicast".format(addr_type))
if deactivate:
- config_data.append(
- "no neighbor {} activate".format(deactivate))
+ config_data.append("no neighbor {} activate".format(deactivate))
next_hop_self = peer.setdefault("next_hop_self", None)
send_community = peer.setdefault("send_community", None)
prefix_lists = peer.setdefault("prefix_lists", {})
route_maps = peer.setdefault("route_maps", {})
no_send_community = peer.setdefault("no_send_community", None)
+ allowas_in = peer.setdefault("allowas-in", None)
# next-hop-self
if next_hop_self:
@@ -481,21 +487,30 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
# no_send_community
if no_send_community:
- config_data.append("no {} send-community {}".format(
- neigh_cxt, no_send_community))
+ config_data.append(
+ "no {} send-community {}".format(neigh_cxt, no_send_community)
+ )
+
+ if "allowas_in" in peer:
+ allow_as_in = peer["allowas_in"]
+ config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
+ if "no_allowas_in" in peer:
+ allow_as_in = peer["no_allowas_in"]
+ config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
if prefix_lists:
for prefix_list in prefix_lists:
name = prefix_list.setdefault("name", {})
direction = prefix_list.setdefault("direction", "in")
del_action = prefix_list.setdefault("delete", False)
if not name:
- logger.info("Router %s: 'name' not present in "
- "input_dict for BGP neighbor prefix lists",
- router)
+ logger.info(
+ "Router %s: 'name' not present in "
+ "input_dict for BGP neighbor prefix lists",
+ router,
+ )
else:
- cmd = "{} prefix-list {} {}".format(neigh_cxt, name,
- direction)
+ cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
@@ -506,16 +521,28 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
direction = route_map.setdefault("direction", "in")
del_action = route_map.setdefault("delete", False)
if not name:
- logger.info("Router %s: 'name' not present in "
- "input_dict for BGP neighbor route name",
- router)
+ logger.info(
+ "Router %s: 'name' not present in "
+ "input_dict for BGP neighbor route name",
+ router,
+ )
else:
- cmd = "{} route-map {} {}".format(neigh_cxt, name,
- direction)
+ cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ if allowas_in:
+ number_occurences = allowas_in.setdefault("number_occurences", {})
+ del_action = allowas_in.setdefault("delete", False)
+
+ cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
return config_data
@@ -564,12 +591,10 @@ def verify_router_id(tgen, topo, input_dict):
rnode = tgen.routers()[router]
- del_router_id = input_dict[router]["bgp"].setdefault(
- "del_router_id", False)
+ del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
logger.info("Checking router %s router-id", router)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
router_id_out = ipaddr.IPv4Address(unicode(router_id_out))
@@ -582,12 +607,12 @@ def verify_router_id(tgen, topo, input_dict):
router_id = ipaddr.IPv4Address(unicode(router_id))
if router_id == router_id_out:
- logger.info("Found expected router-id %s for router %s",
- router_id, router)
+ logger.info("Found expected router-id %s for router %s", router_id, router)
else:
- errormsg = "Router-id for router:{} mismatch, expected:" \
- " {} but found:{}".format(router, router_id,
- router_id_out)
+ errormsg = (
+ "Router-id for router:{} mismatch, expected:"
+ " {} but found:{}".format(router, router_id, router_id_out)
+ )
return errormsg
logger.debug("Exiting lib API: verify_router_id()")
@@ -617,9 +642,11 @@ def verify_bgp_convergence(tgen, topo):
logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
+ if "bgp" not in topo["routers"][router]:
+ continue
+
logger.info("Verifying BGP Convergence on router %s", router)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@@ -647,15 +674,12 @@ def verify_bgp_convergence(tgen, topo):
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
- neighbor_ip = \
- data[dest_link][addr_type].split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"][
- "peers"]
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
- ipv6_data = show_bgp_json["ipv6Unicast"][
- "peers"]
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
if nh_state == "Established":
@@ -663,8 +687,7 @@ def verify_bgp_convergence(tgen, topo):
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s", router)
else:
- errormsg = "BGP is not converged for router {}".format(
- router)
+ errormsg = "BGP is not converged for router {}".format(router)
return errormsg
logger.debug("Exiting API: verify_bgp_convergence()")
@@ -707,16 +730,9 @@ def modify_as_number(tgen, topo, input_dict):
for router in input_dict.keys():
# Remove bgp configuration
- router_dict.update({
- router: {
- "bgp": {
- "delete": True
- }
- }
- })
+ router_dict.update({router: {"bgp": {"delete": True}}})
- new_topo[router]["bgp"]["local_as"] = \
- input_dict[router]["bgp"]["local_as"]
+ new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"]
logger.info("Removing bgp configuration")
create_router_bgp(tgen, topo, router_dict)
@@ -777,8 +793,9 @@ def verify_as_numbers(tgen, topo, input_dict):
logger.info("Verifying AS numbers for dut %s:", router)
- show_ip_bgp_neighbor_json = run_frr_cmd(rnode,
- "show ip bgp neighbor json", isjson=True)
+ show_ip_bgp_neighbor_json = run_frr_cmd(
+ rnode, "show ip bgp neighbor json", isjson=True
+ )
local_as = input_dict[router]["bgp"]["local_as"]
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
@@ -786,8 +803,7 @@ def verify_as_numbers(tgen, topo, input_dict):
if not check_address_types(addr_type):
continue
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
- "neighbor"]
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
@@ -796,32 +812,42 @@ def verify_as_numbers(tgen, topo, input_dict):
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
- neighbor_ip = data[dest_link][addr_type]. \
- split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
# Verify Local AS for router
if neigh_data["localAs"] != local_as:
- errormsg = "Failed: Verify local_as for dut {}," \
- " found: {} but expected: {}".format(
- router, neigh_data["localAs"],
- local_as)
+ errormsg = (
+ "Failed: Verify local_as for dut {},"
+ " found: {} but expected: {}".format(
+ router, neigh_data["localAs"], local_as
+ )
+ )
return errormsg
else:
- logger.info("Verified local_as for dut %s, found"
- " expected: %s", router, local_as)
+ logger.info(
+ "Verified local_as for dut %s, found" " expected: %s",
+ router,
+ local_as,
+ )
# Verify Remote AS for neighbor
if neigh_data["remoteAs"] != remote_as:
- errormsg = "Failed: Verify remote_as for dut " \
- "{}'s neighbor {}, found: {} but " \
- "expected: {}".format(
- router, bgp_neighbor,
- neigh_data["remoteAs"], remote_as)
+ errormsg = (
+ "Failed: Verify remote_as for dut "
+ "{}'s neighbor {}, found: {} but "
+ "expected: {}".format(
+ router, bgp_neighbor, neigh_data["remoteAs"], remote_as
+ )
+ )
return errormsg
else:
- logger.info("Verified remote_as for dut %s's "
- "neighbor %s, found expected: %s",
- router, bgp_neighbor, remote_as)
+ logger.info(
+ "Verified remote_as for dut %s's "
+ "neighbor %s, found expected: %s",
+ router,
+ bgp_neighbor,
+ remote_as,
+ )
logger.debug("Exiting lib API: verify_AS_numbers()")
return True
@@ -862,12 +888,14 @@ def clear_bgp_and_verify(tgen, topo, router):
for retry in range(31):
sleeptime = 3
# Waiting for BGP to converge
- logger.info("Waiting for %s sec for BGP to converge on router"
- " %s...", sleeptime, router)
+ logger.info(
+ "Waiting for %s sec for BGP to converge on router" " %s...",
+ sleeptime,
+ router,
+ )
sleep(sleeptime)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@@ -897,38 +925,39 @@ def clear_bgp_and_verify(tgen, topo, router):
if dest_link in data:
neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"][
- "peers"]
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
# Peer up time dictionary
- peer_uptime_before_clear_bgp[bgp_neighbor] = \
- ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
else:
- ipv6_data = show_bgp_json["ipv6Unicast"][
- "peers"]
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
# Peer up time dictionary
- peer_uptime_before_clear_bgp[bgp_neighbor] = \
- ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
- logger.info("BGP is Converged for router %s before bgp"
- " clear", router)
+ logger.info("BGP is Converged for router %s before bgp" " clear", router)
break
else:
- logger.info("BGP is not yet Converged for router %s "
- "before bgp clear", router)
+ logger.info(
+ "BGP is not yet Converged for router %s " "before bgp clear", router
+ )
else:
- errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \
- " router {}".format(router)
+ errormsg = (
+ "TIMEOUT!! BGP is not converged in 30 seconds for"
+ " router {}".format(router)
+ )
return errormsg
- logger.info(peer_uptime_before_clear_bgp)
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
for addr_type in bgp_addr_type.keys():
@@ -942,13 +971,14 @@ def clear_bgp_and_verify(tgen, topo, router):
for retry in range(31):
sleeptime = 3
# Waiting for BGP to converge
- logger.info("Waiting for %s sec for BGP to converge on router"
- " %s...", sleeptime, router)
+ logger.info(
+ "Waiting for %s sec for BGP to converge on router" " %s...",
+ sleeptime,
+ router,
+ )
sleep(sleeptime)
-
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@@ -975,44 +1005,46 @@ def clear_bgp_and_verify(tgen, topo, router):
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
- neighbor_ip = data[dest_link][addr_type].\
- split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"][
- "peers"]
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
- peer_uptime_after_clear_bgp[bgp_neighbor] = \
- ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
else:
- ipv6_data = show_bgp_json["ipv6Unicast"][
- "peers"]
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
# Peer up time dictionary
- peer_uptime_after_clear_bgp[bgp_neighbor] = \
- ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
- logger.info("BGP is Converged for router %s after bgp clear",
- router)
+ logger.info("BGP is Converged for router %s after bgp clear", router)
break
else:
- logger.info("BGP is not yet Converged for router %s after"
- " bgp clear", router)
+ logger.info(
+ "BGP is not yet Converged for router %s after" " bgp clear", router
+ )
else:
- errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \
- " router {}".format(router)
+ errormsg = (
+ "TIMEOUT!! BGP is not converged in 30 seconds for"
+ " router {}".format(router)
+ )
return errormsg
- logger.info(peer_uptime_after_clear_bgp)
+
# Comparing peerUptimeEstablishedEpoch dictionaries
if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
- logger.info("BGP neighborship is reset after clear BGP on router %s",
- router)
+ logger.info("BGP neighborship is reset after clear BGP on router %s", router)
else:
- errormsg = "BGP neighborship is not reset after clear bgp on router" \
- " {}".format(router)
+ errormsg = (
+ "BGP neighborship is not reset after clear bgp on router"
+ " {}".format(router)
+ )
return errormsg
logger.debug("Exiting lib API: clear_bgp_and_verify()")
@@ -1060,11 +1092,11 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
rnode = router_list[router]
- logger.info("Verifying bgp timers functionality, DUT is %s:",
- router)
+ logger.info("Verifying bgp timers functionality, DUT is %s:", router)
- show_ip_bgp_neighbor_json = \
- run_frr_cmd(rnode, "show ip bgp neighbor json", isjson=True)
+ show_ip_bgp_neighbor_json = run_frr_cmd(
+ rnode, "show ip bgp neighbor json", isjson=True
+ )
bgp_addr_type = input_dict[router]["bgp"]["address_family"]
@@ -1072,8 +1104,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
if not check_address_types(addr_type):
continue
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
- "neighbor"]
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link, peer_dict in peer_data["dest_link"].iteritems():
data = topo["routers"][bgp_neighbor]["links"]
@@ -1082,32 +1113,41 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
holddowntimer = peer_dict["holddowntimer"]
if dest_link in data:
- neighbor_ip = data[dest_link][addr_type]. \
- split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
neighbor_intf = data[dest_link]["interface"]
# Verify HoldDownTimer for neighbor
- bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[
- neighbor_ip]["bgpTimerHoldTimeMsecs"]
+ bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
+ "bgpTimerHoldTimeMsecs"
+ ]
if bgpHoldTimeMsecs != holddowntimer * 1000:
- errormsg = "Verifying holddowntimer for bgp " \
- "neighbor {} under dut {}, found: {} " \
- "but expected: {}".format(
- neighbor_ip, router,
- bgpHoldTimeMsecs,
- holddowntimer * 1000)
+ errormsg = (
+ "Verifying holddowntimer for bgp "
+ "neighbor {} under dut {}, found: {} "
+ "but expected: {}".format(
+ neighbor_ip,
+ router,
+ bgpHoldTimeMsecs,
+ holddowntimer * 1000,
+ )
+ )
return errormsg
# Verify KeepAliveTimer for neighbor
- bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[
- neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"]
+ bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
+ "bgpTimerKeepAliveIntervalMsecs"
+ ]
if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
- errormsg = "Verifying keepalivetimer for bgp " \
- "neighbor {} under dut {}, found: {} " \
- "but expected: {}".format(
- neighbor_ip, router,
- bgpKeepAliveTimeMsecs,
- keepalivetimer * 1000)
+ errormsg = (
+ "Verifying keepalivetimer for bgp "
+ "neighbor {} under dut {}, found: {} "
+ "but expected: {}".format(
+ neighbor_ip,
+ router,
+ bgpKeepAliveTimeMsecs,
+ keepalivetimer * 1000,
+ )
+ )
return errormsg
####################
@@ -1120,40 +1160,50 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
# Wait till keep alive time
logger.info("=" * 20)
logger.info("Scenario 1:")
- logger.info("Shutdown and bring up peer interface: %s "
- "in keep alive time : %s sec and verify "
- " BGP neighborship is intact in %s sec ",
- neighbor_intf, keepalivetimer,
- (holddowntimer - keepalivetimer))
+ logger.info(
+ "Shutdown and bring up peer interface: %s "
+ "in keep alive time : %s sec and verify "
+ " BGP neighborship is intact in %s sec ",
+ neighbor_intf,
+ keepalivetimer,
+ (holddowntimer - keepalivetimer),
+ )
logger.info("=" * 20)
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
# Shutting down peer ineterface
- logger.info("Shutting down interface %s on router %s",
- neighbor_intf, bgp_neighbor)
+ logger.info(
+ "Shutting down interface %s on router %s",
+ neighbor_intf,
+ bgp_neighbor,
+ )
topotest.interface_set_status(
- router_list[bgp_neighbor], neighbor_intf,
- ifaceaction=False)
+ router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
+ )
# Bringing up peer interface
sleep(5)
- logger.info("Bringing up interface %s on router %s..",
- neighbor_intf, bgp_neighbor)
+ logger.info(
+ "Bringing up interface %s on router %s..",
+ neighbor_intf,
+ bgp_neighbor,
+ )
topotest.interface_set_status(
- router_list[bgp_neighbor], neighbor_intf,
- ifaceaction=True)
+ router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
+ )
# Verifying BGP neighborship is intact in
# (holddown - keepalive) time
- for timer in range(keepalivetimer, holddowntimer,
- int(holddowntimer / 3)):
+ for timer in range(
+ keepalivetimer, holddowntimer, int(holddowntimer / 3)
+ ):
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
sleep(2)
- show_bgp_json = \
- run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(
+ rnode, "show bgp summary json", isjson=True
+ )
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
@@ -1162,17 +1212,22 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
- if timer == \
- (holddowntimer - keepalivetimer):
+ if timer == (holddowntimer - keepalivetimer):
if nh_state != "Established":
- errormsg = "BGP neighborship has not gone " \
- "down in {} sec for neighbor {}" \
- .format(timer, bgp_neighbor)
+ errormsg = (
+ "BGP neighborship has not gone "
+ "down in {} sec for neighbor {}".format(
+ timer, bgp_neighbor
+ )
+ )
return errormsg
else:
- logger.info("BGP neighborship is intact in %s"
- " sec for neighbor %s",
- timer, bgp_neighbor)
+ logger.info(
+ "BGP neighborship is intact in %s"
+ " sec for neighbor %s",
+ timer,
+ bgp_neighbor,
+ )
####################
# Shutting down peer interface and verifying that BGP
@@ -1180,27 +1235,36 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
####################
logger.info("=" * 20)
logger.info("Scenario 2:")
- logger.info("Shutdown peer interface: %s and verify BGP"
- " neighborship has gone down in hold down "
- "time %s sec", neighbor_intf, holddowntimer)
+ logger.info(
+ "Shutdown peer interface: %s and verify BGP"
+ " neighborship has gone down in hold down "
+ "time %s sec",
+ neighbor_intf,
+ holddowntimer,
+ )
logger.info("=" * 20)
- logger.info("Shutting down interface %s on router %s..",
- neighbor_intf, bgp_neighbor)
- topotest.interface_set_status(router_list[bgp_neighbor],
- neighbor_intf,
- ifaceaction=False)
+ logger.info(
+ "Shutting down interface %s on router %s..",
+ neighbor_intf,
+ bgp_neighbor,
+ )
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
+ )
# Verifying BGP neighborship is going down in holddown time
- for timer in range(keepalivetimer,
- (holddowntimer + keepalivetimer),
- int(holddowntimer / 3)):
+ for timer in range(
+ keepalivetimer,
+ (holddowntimer + keepalivetimer),
+ int(holddowntimer / 3),
+ ):
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
sleep(2)
- show_bgp_json = \
- run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(
+ rnode, "show bgp summary json", isjson=True
+ )
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
@@ -1211,22 +1275,29 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
if timer == holddowntimer:
if nh_state == "Established":
- errormsg = "BGP neighborship has not gone " \
- "down in {} sec for neighbor {}" \
- .format(timer, bgp_neighbor)
+ errormsg = (
+ "BGP neighborship has not gone "
+ "down in {} sec for neighbor {}".format(
+ timer, bgp_neighbor
+ )
+ )
return errormsg
else:
- logger.info("BGP neighborship has gone down in"
- " %s sec for neighbor %s",
- timer, bgp_neighbor)
+ logger.info(
+ "BGP neighborship has gone down in"
+ " %s sec for neighbor %s",
+ timer,
+ bgp_neighbor,
+ )
logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()")
return True
@retry(attempts=3, wait=4, return_is_str=True)
-def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
- input_dict, seq_id=None):
+def verify_bgp_attributes(
+ tgen, addr_type, dut, static_routes, rmap_name, input_dict, seq_id=None
+):
"""
API will verify BGP attributes set by Route-map for given prefix and
DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
@@ -1256,7 +1327,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
}
},
"set": {
- "localpref": 150,
+ "locPrf": 150,
"weight": 100
}
}],
@@ -1269,7 +1340,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
}
},
"set": {
- "med": 50
+ "metric": 50
}
}]
}
@@ -1288,7 +1359,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
if router != dut:
continue
- logger.info('Verifying BGP set attributes for dut {}:'.format(router))
+ logger.info("Verifying BGP set attributes for dut {}:".format(router))
for static_route in static_routes:
cmd = "show bgp {} {} json".format(addr_type, static_route)
@@ -1297,8 +1368,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
dict_to_test = []
tmp_list = []
for rmap_router in input_dict.keys():
- for rmap, values in input_dict[rmap_router][
- "route_maps"].items():
+ for rmap, values in input_dict[rmap_router]["route_maps"].items():
if rmap == rmap_name:
dict_to_test = values
for rmap_dict in values:
@@ -1307,8 +1377,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
seq_id = [seq_id]
if "seq_id" in rmap_dict:
- rmap_seq_id = \
- rmap_dict["seq_id"]
+ rmap_seq_id = rmap_dict["seq_id"]
for _seq_id in seq_id:
if _seq_id == rmap_seq_id:
tmp_list.append(rmap_dict)
@@ -1318,55 +1387,56 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
for rmap_dict in dict_to_test:
if "set" in rmap_dict:
for criteria in rmap_dict["set"].keys():
- if criteria not in show_bgp_json[
- "paths"][0]:
- errormsg = ("BGP attribute: {}"
- " is not found in"
- " cli: {} output "
- "in router {}".
- format(criteria,
- cmd,
- router))
+ if criteria not in show_bgp_json["paths"][0]:
+ errormsg = (
+ "BGP attribute: {}"
+ " is not found in"
+ " cli: {} output "
+ "in router {}".format(criteria, cmd, router)
+ )
return errormsg
- if rmap_dict["set"][criteria] == \
- show_bgp_json["paths"][0][
- criteria]:
- logger.info("Verifying BGP "
- "attribute {} for"
- " route: {} in "
- "router: {}, found"
- " expected value:"
- " {}".
- format(criteria,
- static_route,
- dut,
- rmap_dict[
- "set"][
- criteria]))
+ if (
+ rmap_dict["set"][criteria]
+ == show_bgp_json["paths"][0][criteria]
+ ):
+ logger.info(
+ "Verifying BGP "
+ "attribute {} for"
+ " route: {} in "
+ "router: {}, found"
+ " expected value:"
+ " {}".format(
+ criteria,
+ static_route,
+ dut,
+ rmap_dict["set"][criteria],
+ )
+ )
else:
- errormsg = \
- ("Failed: Verifying BGP "
- "attribute {} for route:"
- " {} in router: {}, "
- " expected value: {} but"
- " found: {}".
- format(criteria,
- static_route,
- dut,
- rmap_dict["set"]
- [criteria],
- show_bgp_json[
- 'paths'][
- 0][criteria]))
+ errormsg = (
+ "Failed: Verifying BGP "
+ "attribute {} for route:"
+ " {} in router: {}, "
+ " expected value: {} but"
+ " found: {}".format(
+ criteria,
+ static_route,
+ dut,
+ rmap_dict["set"][criteria],
+ show_bgp_json["paths"][0][criteria],
+ )
+ )
return errormsg
logger.debug("Exiting lib API: verify_bgp_attributes()")
return True
+
@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
-def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
- attribute):
+def verify_best_path_as_per_bgp_attribute(
+ tgen, addr_type, router, input_dict, attribute
+):
"""
API is to verify best path according to BGP attributes for given routes.
"show bgp ipv4/6 json" command will be run and verify best path according
@@ -1406,7 +1476,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
}
}
}
- attribute = "localpref"
+ attribute = "locPrf"
result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
input_dict, attribute)
Returns
@@ -1443,40 +1513,38 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
attribute_dict[next_hop_ip] = route_attribute[attribute]
# AS_PATH attribute
- if attribute == "aspath":
+ if attribute == "path":
# Find next_hop for the route have minimum as_path
- _next_hop = min(attribute_dict, key=lambda x: len(set(
- attribute_dict[x])))
+ _next_hop = min(
+ attribute_dict, key=lambda x: len(set(attribute_dict[x]))
+ )
compare = "SHORTEST"
# LOCAL_PREF attribute
- elif attribute == "localpref":
+ elif attribute == "locPrf":
# Find next_hop for the route have highest local preference
- _next_hop = max(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "HIGHEST"
# WEIGHT attribute
elif attribute == "weight":
# Find next_hop for the route have highest weight
- _next_hop = max(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "HIGHEST"
# ORIGIN attribute
elif attribute == "origin":
# Find next_hop for the route have IGP as origin, -
# - rule is IGP>EGP>INCOMPLETE
- _next_hop = [key for (key, value) in
- attribute_dict.iteritems()
- if value == "IGP"][0]
+ _next_hop = [
+ key for (key, value) in attribute_dict.iteritems() if value == "IGP"
+ ][0]
compare = ""
# MED attribute
- elif attribute == "med":
+ elif attribute == "metric":
# Find next_hop for the route have LOWEST MED
- _next_hop = min(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "LOWEST"
# Show ip route
@@ -1489,8 +1557,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
# Verifying output dictionary rib_routes_json is not empty
if not bool(rib_routes_json):
- errormsg = "No route found in RIB of router {}..". \
- format(router)
+ errormsg = "No route found in RIB of router {}..".format(router)
return errormsg
st_found = False
@@ -1499,31 +1566,41 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
if route in rib_routes_json:
st_found = True
# Verify next_hop in rib_routes_json
- if rib_routes_json[route][0]["nexthops"][0]["ip"] in \
- attribute_dict:
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict:
nh_found = True
else:
- errormsg = "Incorrect Nexthop for BGP route {} in " \
- "RIB of router {}, Expected: {}, Found:" \
- " {}\n".format(route, router,
- rib_routes_json[route][0][
- "nexthops"][0]["ip"],
- _next_hop)
+ errormsg = (
+ "Incorrect Nexthop for BGP route {} in "
+ "RIB of router {}, Expected: {}, Found:"
+ " {}\n".format(
+ route,
+ router,
+ rib_routes_json[route][0]["nexthops"][0]["ip"],
+ _next_hop,
+ )
+ )
return errormsg
if st_found and nh_found:
logger.info(
"Best path for prefix: %s with next_hop: %s is "
"installed according to %s %s: (%s) in RIB of "
- "router %s", route, _next_hop, compare,
- attribute, attribute_dict[_next_hop], router)
+ "router %s",
+ route,
+ _next_hop,
+ compare,
+ attribute,
+ attribute_dict[_next_hop],
+ router,
+ )
logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()")
return True
-def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
- attribute):
+def verify_best_path_as_per_admin_distance(
+ tgen, addr_type, router, input_dict, attribute
+):
"""
API is to verify best path according to admin distance for given
route. "show ip/ipv6 route json" command will be run and verify
@@ -1548,7 +1625,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
{"network": "200.50.2.0/32", \
"admin_distance": 60, "next_hop": "10.0.0.18"}]
}}
- attribute = "localpref"
+ attribute = "locPrf"
result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
input_dict, attribute):
Returns
@@ -1574,7 +1651,8 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
for routes_from_router in input_dict.keys():
sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
- command, isjson=True)
+ command, isjson=True
+ )
networks = input_dict[routes_from_router]["static_routes"]
for network in networks:
route = network["network"]
@@ -1590,8 +1668,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
attribute_dict[next_hop_ip] = route_attribute["distance"]
# Find next_hop for the route have LOWEST Admin Distance
- _next_hop = min(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "LOWEST"
# Show ip route
@@ -1608,21 +1685,523 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
if route in rib_routes_json:
st_found = True
# Verify next_hop in rib_routes_json
- if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
- _next_hop:
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop:
nh_found = True
else:
- errormsg = ("Nexthop {} is Missing for BGP route {}"
- " in RIB of router {}\n".format(_next_hop,
- route, router))
+ errormsg = (
+ "Nexthop {} is Missing for BGP route {}"
+ " in RIB of router {}\n".format(_next_hop, route, router)
+ )
return errormsg
if st_found and nh_found:
- logger.info("Best path for prefix: %s is installed according"
- " to %s %s: (%s) in RIB of router %s", route,
- compare, attribute,
- attribute_dict[_next_hop], router)
+ logger.info(
+ "Best path for prefix: %s is installed according"
+ " to %s %s: (%s) in RIB of router %s",
+ route,
+ compare,
+ attribute,
+ attribute_dict[_next_hop],
+ router,
+ )
+
+ logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()")
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+ """
+ This API is to verify whether bgp rib has any
+ matching route for a nexthop.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: input dut router name
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default = static
+ * 'aspath'[optional]: aspath which needs to be verified
+
+ Usage
+ -----
+ dut = 'r1'
+ next_hop = "192.168.1.10"
+ input_dict = topo['routers']
+ aspath = "100 200 300"
+ result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
+ next_hop, aspath)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_rib()")
+
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ list1 = []
+ list2 = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ command = "show bgp"
+
+ # Static routes
+ sleep(2)
+ logger.info("Checking router {} BGP RIB:".format(dut))
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ found_routes = []
+ missing_routes = []
+ st_found = False
+ nh_found = False
+ vrf = static_route.setdefault("vrf", None)
+ if vrf:
+ cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ network = static_route["network"]
+
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if not isinstance(next_hop, list):
+ next_hop = [next_hop]
+ list1 = next_hop
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
+ list2 = found_hops
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
+
+ if list2:
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if aspath:
+ found_paths = rib_routes_json["routes"][st_rt][0][
+ "path"
+ ]
+ if aspath == found_paths:
+ aspath_found = True
+ logger.info(
+ "Found AS path {} for route"
+ " {} in RIB of router "
+ "{}\n".format(aspath, st_rt, dut)
+ )
+ else:
+ errormsg = (
+ "AS Path {} is missing for route"
+ "for route {} in RIB of router {}\n".format(
+ aspath, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, router)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".format(dut, found_routes)
+ )
+ continue
+
+ if "bgp" not in input_dict[routerInput]:
+ continue
+
+ # Advertise networks
+ bgp_data_list = input_dict[routerInput]["bgp"]
+
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
+
+ for bgp_data in bgp_data_list:
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
+ advertise_network = bgp_net_advertise.setdefault(
+ "advertise_networks", []
+ )
+
+ for advertise_network_dict in advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+
+ network = advertise_network_dict["network"]
+
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_network)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ found = False
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: verify_bgp_rib()")
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+ """
+ This API is to verify whether bgp rib has any
+ matching route for a nexthop.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: input dut router name
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default = static
+ * 'aspath'[optional]: aspath which needs to be verified
+
+ Usage
+ -----
+ dut = 'r1'
+ next_hop = "192.168.1.10"
+ input_dict = topo['routers']
+ aspath = "100 200 300"
+ result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
+ next_hop, aspath)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_rib()")
+
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ list1 = []
+ list2 = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ command = "show bgp"
+
+ # Static routes
+ sleep(2)
+ logger.info("Checking router {} BGP RIB:".format(dut))
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ found_routes = []
+ missing_routes = []
+ st_found = False
+ nh_found = False
+ vrf = static_route.setdefault("vrf", None)
+ if vrf:
+ cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ network = static_route["network"]
+
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if not isinstance(next_hop, list):
+ next_hop = [next_hop]
+ list1 = next_hop
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
+ list2 = found_hops
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
+
+ if list2:
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if aspath:
+ found_paths = rib_routes_json["routes"][st_rt][0][
+ "path"
+ ]
+ if aspath == found_paths:
+ aspath_found = True
+ logger.info(
+ "Found AS path {} for route"
+ " {} in RIB of router "
+ "{}\n".format(aspath, st_rt, dut)
+ )
+ else:
+ errormsg = (
+ "AS Path {} is missing for route"
+ "for route {} in RIB of router {}\n".format(
+ aspath, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, router)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".format(dut, found_routes)
+ )
+ continue
+
+ if "bgp" not in input_dict[routerInput]:
+ continue
+
+ # Advertise networks
+ bgp_data_list = input_dict[routerInput]["bgp"]
+
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
+
+ for bgp_data in bgp_data_list:
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
+ advertise_network = bgp_net_advertise.setdefault(
+ "advertise_networks", []
+ )
+
+ for advertise_network_dict in advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+
+ network = advertise_network_dict["network"]
+
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_network)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ found = False
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes)
+ )
- logger.info(
- "Exiting lib API: verify_best_path_as_per_admin_distance()")
+ logger.debug("Exiting lib API: verify_bgp_rib()")
return True