summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py737
1 files changed, 395 insertions, 342 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 15f970ba75..d183fb9f60 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -27,13 +27,16 @@ from lib import topotest
from lib.topolog import logger
# Import common_config to use commomnly used APIs
-from lib.common_config import (create_common_configuration,
- InvalidCLIError,
- load_config_to_router,
- check_address_types,
- generate_ips,
- find_interface_with_greater_ip,
- run_frr_cmd, retry)
+from lib.common_config import (
+ create_common_configuration,
+ InvalidCLIError,
+ load_config_to_router,
+ check_address_types,
+ generate_ips,
+ find_interface_with_greater_ip,
+ run_frr_cmd,
+ retry,
+)
BGP_CONVERGENCE_TIMEOUT = 10
@@ -126,24 +129,31 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False):
bgp_addr_data = bgp_data.setdefault("address_family", {})
if not bgp_addr_data:
- logger.debug("Router %s: 'address_family' not present in "
- "input_dict for BGP", router)
+ logger.debug(
+ "Router %s: 'address_family' not present in " "input_dict for BGP",
+ router,
+ )
else:
ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {})
- neigh_unicast = True if ipv4_data.setdefault("unicast", {}) \
- or ipv6_data.setdefault("unicast", {}) else False
+ neigh_unicast = (
+ True
+ if ipv4_data.setdefault("unicast", {})
+ or ipv6_data.setdefault("unicast", {})
+ else False
+ )
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
- tgen, topo, input_dict, router,
- config_data=data_all_bgp)
+ tgen, topo, input_dict, router, config_data=data_all_bgp
+ )
try:
- result = create_common_configuration(tgen, router, data_all_bgp,
- "bgp", build)
+ result = create_common_configuration(
+ tgen, router, data_all_bgp, "bgp", build
+ )
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
@@ -182,8 +192,9 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
config_data = []
if "local_as" not in bgp_data and build:
- logger.error("Router %s: 'local_as' not present in input_dict"
- "for BGP", router)
+ logger.error(
+ "Router %s: 'local_as' not present in input_dict" "for BGP", router
+ )
return False
local_as = bgp_data.setdefault("local_as", "")
@@ -199,14 +210,12 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
if del_router_id:
config_data.append("no bgp router-id")
if router_id:
- config_data.append("bgp router-id {}".format(
- router_id))
+ config_data.append("bgp router-id {}".format(router_id))
return config_data
-def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
- config_data=None):
+def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data=None):
"""
Helper API to create configuration for address-family unicast
@@ -235,11 +244,8 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
addr_data = addr_dict["unicast"]
if addr_data:
- config_data.append("address-family {} unicast".format(
- addr_type
- ))
- advertise_network = addr_data.setdefault("advertise_networks",
- [])
+ config_data.append("address-family {} unicast".format(addr_type))
+ advertise_network = addr_data.setdefault("advertise_networks", [])
for advertise_network_dict in advertise_network:
network = advertise_network_dict["network"]
if type(network) is not list:
@@ -250,12 +256,10 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
else:
no_of_network = 1
- del_action = advertise_network_dict.setdefault("delete",
- False)
+ del_action = advertise_network_dict.setdefault("delete", False)
# Generating IPs for verification
- prefix = str(
- ipaddr.IPNetwork(unicode(network[0])).prefixlen)
+ prefix = str(ipaddr.IPNetwork(unicode(network[0])).prefixlen)
network_list = generate_ips(network, no_of_network)
for ip in network_list:
ip = str(ipaddr.IPNetwork(unicode(ip)).network)
@@ -271,20 +275,17 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
ibgp = max_paths.setdefault("ibgp", None)
ebgp = max_paths.setdefault("ebgp", None)
if ibgp:
- config_data.append("maximum-paths ibgp {}".format(
- ibgp
- ))
+ config_data.append("maximum-paths ibgp {}".format(ibgp))
if ebgp:
- config_data.append("maximum-paths {}".format(
- ebgp
- ))
+ config_data.append("maximum-paths {}".format(ebgp))
aggregate_addresses = addr_data.setdefault("aggregate_address", [])
for aggregate_address in aggregate_addresses:
network = aggregate_address.setdefault("network", None)
if not network:
- logger.debug("Router %s: 'network' not present in "
- "input_dict for BGP", router)
+ logger.debug(
+ "Router %s: 'network' not present in " "input_dict for BGP", router
+ )
else:
cmd = "aggregate-address {}".format(network)
@@ -305,13 +306,12 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
- logger.error("Router %s: 'redist_type' not present in "
- "input_dict", router)
+ logger.error(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
else:
- cmd = "redistribute {}".format(
- redistribute["redist_type"])
- redist_attr = redistribute.setdefault("attribute",
- None)
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ redist_attr = redistribute.setdefault("attribute", None)
if redist_attr:
cmd = "{} {}".format(cmd, redist_attr)
del_action = redistribute.setdefault("delete", False)
@@ -320,8 +320,9 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
config_data.append(cmd)
if "neighbor" in addr_data:
- neigh_data = __create_bgp_neighbor(topo, input_dict,
- router, addr_type, add_neigh)
+ neigh_data = __create_bgp_neighbor(
+ topo, input_dict, router, addr_type, add_neigh
+ )
config_data.extend(neigh_data)
for addr_type, addr_dict in bgp_data.iteritems():
@@ -331,11 +332,11 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
addr_data = addr_dict["unicast"]
if "neighbor" in addr_data:
neigh_addr_data = __create_bgp_unicast_address_family(
- topo, input_dict, router, addr_type, add_neigh)
+ topo, input_dict, router, addr_type, add_neigh
+ )
config_data.extend(neigh_addr_data)
-
logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
return config_data
@@ -365,12 +366,10 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
update_source = None
if dest_link in nh_details["links"].keys():
- ip_addr = \
- nh_details["links"][dest_link][addr_type].split("/")[0]
+ ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
# Loopback interface
if "source_link" in peer and peer["source_link"] == "lo":
- update_source = topo[router]["links"]["lo"][
- addr_type].split("/")[0]
+ update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
neigh_cxt = "neighbor {}".format(ip_addr)
@@ -380,41 +379,44 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
config_data.append("address-family ipv6 unicast")
config_data.append("{} activate".format(neigh_cxt))
- disable_connected = peer.setdefault("disable_connected_check",
- False)
+ disable_connected = peer.setdefault("disable_connected_check", False)
keep_alive = peer.setdefault("keepalivetimer", 60)
hold_down = peer.setdefault("holddowntimer", 180)
password = peer.setdefault("password", None)
max_hop_limit = peer.setdefault("ebgp_multihop", 1)
if update_source:
- config_data.append("{} update-source {}".format(
- neigh_cxt, update_source))
+ config_data.append(
+ "{} update-source {}".format(neigh_cxt, update_source)
+ )
if disable_connected:
- config_data.append("{} disable-connected-check".format(
- disable_connected))
+ config_data.append(
+ "{} disable-connected-check".format(disable_connected)
+ )
if update_source:
- config_data.append("{} update-source {}".format(neigh_cxt,
- update_source))
+ config_data.append(
+ "{} update-source {}".format(neigh_cxt, update_source)
+ )
if int(keep_alive) != 60 and int(hold_down) != 180:
config_data.append(
- "{} timers {} {}".format(neigh_cxt, keep_alive,
- hold_down))
+ "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
+ )
if password:
- config_data.append(
- "{} password {}".format(neigh_cxt, password))
+ config_data.append("{} password {}".format(neigh_cxt, password))
if max_hop_limit > 1:
- config_data.append("{} ebgp-multihop {}".format(neigh_cxt,
- max_hop_limit))
+ config_data.append(
+ "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
+ )
config_data.append("{} enforce-multihop".format(neigh_cxt))
logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
return config_data
-def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
- add_neigh=True):
+def __create_bgp_unicast_address_family(
+ topo, input_dict, router, addr_type, add_neigh=True
+):
"""
API prints bgp global config to bgp_json file.
@@ -440,31 +442,27 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
nh_details = topo[peer_name]
# Loopback interface
if "source_link" in peer and peer["source_link"] == "lo":
- for destRouterLink, data in sorted(nh_details["links"].
- iteritems()):
+ for destRouterLink, data in sorted(nh_details["links"].iteritems()):
if "type" in data and data["type"] == "loopback":
if dest_link == destRouterLink:
- ip_addr = \
- nh_details["links"][destRouterLink][
- addr_type].split("/")[0]
+ ip_addr = nh_details["links"][destRouterLink][
+ addr_type
+ ].split("/")[0]
# Physical interface
else:
if dest_link in nh_details["links"].keys():
- ip_addr = nh_details["links"][dest_link][
- addr_type].split("/")[0]
+ ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
if addr_type == "ipv4" and bgp_data["ipv6"]:
- deactivate = nh_details["links"][
- dest_link]["ipv6"].split("/")[0]
+ deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
+ 0
+ ]
neigh_cxt = "neighbor {}".format(ip_addr)
- config_data.append("address-family {} unicast".format(
- addr_type
- ))
+ config_data.append("address-family {} unicast".format(addr_type))
if deactivate:
- config_data.append(
- "no neighbor {} activate".format(deactivate))
+ config_data.append("no neighbor {} activate".format(deactivate))
next_hop_self = peer.setdefault("next_hop_self", None)
send_community = peer.setdefault("send_community", None)
@@ -481,8 +479,9 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
# no_send_community
if no_send_community:
- config_data.append("no {} send-community {}".format(
- neigh_cxt, no_send_community))
+ config_data.append(
+ "no {} send-community {}".format(neigh_cxt, no_send_community)
+ )
if prefix_lists:
for prefix_list in prefix_lists:
@@ -490,12 +489,13 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
direction = prefix_list.setdefault("direction", "in")
del_action = prefix_list.setdefault("delete", False)
if not name:
- logger.info("Router %s: 'name' not present in "
- "input_dict for BGP neighbor prefix lists",
- router)
+ logger.info(
+ "Router %s: 'name' not present in "
+ "input_dict for BGP neighbor prefix lists",
+ router,
+ )
else:
- cmd = "{} prefix-list {} {}".format(neigh_cxt, name,
- direction)
+ cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
@@ -506,12 +506,13 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
direction = route_map.setdefault("direction", "in")
del_action = route_map.setdefault("delete", False)
if not name:
- logger.info("Router %s: 'name' not present in "
- "input_dict for BGP neighbor route name",
- router)
+ logger.info(
+ "Router %s: 'name' not present in "
+ "input_dict for BGP neighbor route name",
+ router,
+ )
else:
- cmd = "{} route-map {} {}".format(neigh_cxt, name,
- direction)
+ cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
@@ -564,12 +565,10 @@ def verify_router_id(tgen, topo, input_dict):
rnode = tgen.routers()[router]
- del_router_id = input_dict[router]["bgp"].setdefault(
- "del_router_id", False)
+ del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
logger.info("Checking router %s router-id", router)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
router_id_out = ipaddr.IPv4Address(unicode(router_id_out))
@@ -582,12 +581,12 @@ def verify_router_id(tgen, topo, input_dict):
router_id = ipaddr.IPv4Address(unicode(router_id))
if router_id == router_id_out:
- logger.info("Found expected router-id %s for router %s",
- router_id, router)
+ logger.info("Found expected router-id %s for router %s", router_id, router)
else:
- errormsg = "Router-id for router:{} mismatch, expected:" \
- " {} but found:{}".format(router, router_id,
- router_id_out)
+ errormsg = (
+ "Router-id for router:{} mismatch, expected:"
+ " {} but found:{}".format(router, router_id, router_id_out)
+ )
return errormsg
logger.debug("Exiting lib API: verify_router_id()")
@@ -618,8 +617,7 @@ def verify_bgp_convergence(tgen, topo):
logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
logger.info("Verifying BGP Convergence on router %s", router)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@@ -647,15 +645,12 @@ def verify_bgp_convergence(tgen, topo):
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
- neighbor_ip = \
- data[dest_link][addr_type].split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"][
- "peers"]
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
- ipv6_data = show_bgp_json["ipv6Unicast"][
- "peers"]
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
if nh_state == "Established":
@@ -663,8 +658,7 @@ def verify_bgp_convergence(tgen, topo):
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s", router)
else:
- errormsg = "BGP is not converged for router {}".format(
- router)
+ errormsg = "BGP is not converged for router {}".format(router)
return errormsg
logger.debug("Exiting API: verify_bgp_convergence()")
@@ -707,16 +701,9 @@ def modify_as_number(tgen, topo, input_dict):
for router in input_dict.keys():
# Remove bgp configuration
- router_dict.update({
- router: {
- "bgp": {
- "delete": True
- }
- }
- })
+ router_dict.update({router: {"bgp": {"delete": True}}})
- new_topo[router]["bgp"]["local_as"] = \
- input_dict[router]["bgp"]["local_as"]
+ new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"]
logger.info("Removing bgp configuration")
create_router_bgp(tgen, topo, router_dict)
@@ -777,8 +764,9 @@ def verify_as_numbers(tgen, topo, input_dict):
logger.info("Verifying AS numbers for dut %s:", router)
- show_ip_bgp_neighbor_json = run_frr_cmd(rnode,
- "show ip bgp neighbor json", isjson=True)
+ show_ip_bgp_neighbor_json = run_frr_cmd(
+ rnode, "show ip bgp neighbor json", isjson=True
+ )
local_as = input_dict[router]["bgp"]["local_as"]
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
@@ -786,8 +774,7 @@ def verify_as_numbers(tgen, topo, input_dict):
if not check_address_types(addr_type):
continue
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
- "neighbor"]
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
@@ -796,32 +783,42 @@ def verify_as_numbers(tgen, topo, input_dict):
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
- neighbor_ip = data[dest_link][addr_type]. \
- split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
# Verify Local AS for router
if neigh_data["localAs"] != local_as:
- errormsg = "Failed: Verify local_as for dut {}," \
- " found: {} but expected: {}".format(
- router, neigh_data["localAs"],
- local_as)
+ errormsg = (
+ "Failed: Verify local_as for dut {},"
+ " found: {} but expected: {}".format(
+ router, neigh_data["localAs"], local_as
+ )
+ )
return errormsg
else:
- logger.info("Verified local_as for dut %s, found"
- " expected: %s", router, local_as)
+ logger.info(
+ "Verified local_as for dut %s, found" " expected: %s",
+ router,
+ local_as,
+ )
# Verify Remote AS for neighbor
if neigh_data["remoteAs"] != remote_as:
- errormsg = "Failed: Verify remote_as for dut " \
- "{}'s neighbor {}, found: {} but " \
- "expected: {}".format(
- router, bgp_neighbor,
- neigh_data["remoteAs"], remote_as)
+ errormsg = (
+ "Failed: Verify remote_as for dut "
+ "{}'s neighbor {}, found: {} but "
+ "expected: {}".format(
+ router, bgp_neighbor, neigh_data["remoteAs"], remote_as
+ )
+ )
return errormsg
else:
- logger.info("Verified remote_as for dut %s's "
- "neighbor %s, found expected: %s",
- router, bgp_neighbor, remote_as)
+ logger.info(
+ "Verified remote_as for dut %s's "
+ "neighbor %s, found expected: %s",
+ router,
+ bgp_neighbor,
+ remote_as,
+ )
logger.debug("Exiting lib API: verify_AS_numbers()")
return True
@@ -862,12 +859,14 @@ def clear_bgp_and_verify(tgen, topo, router):
for retry in range(31):
sleeptime = 3
# Waiting for BGP to converge
- logger.info("Waiting for %s sec for BGP to converge on router"
- " %s...", sleeptime, router)
+ logger.info(
+ "Waiting for %s sec for BGP to converge on router" " %s...",
+ sleeptime,
+ router,
+ )
sleep(sleeptime)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@@ -897,35 +896,37 @@ def clear_bgp_and_verify(tgen, topo, router):
if dest_link in data:
neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"][
- "peers"]
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
# Peer up time dictionary
- peer_uptime_before_clear_bgp[bgp_neighbor] = \
- ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
else:
- ipv6_data = show_bgp_json["ipv6Unicast"][
- "peers"]
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
# Peer up time dictionary
- peer_uptime_before_clear_bgp[bgp_neighbor] = \
- ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
- logger.info("BGP is Converged for router %s before bgp"
- " clear", router)
+ logger.info("BGP is Converged for router %s before bgp" " clear", router)
break
else:
- logger.info("BGP is not yet Converged for router %s "
- "before bgp clear", router)
+ logger.info(
+ "BGP is not yet Converged for router %s " "before bgp clear", router
+ )
else:
- errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \
- " router {}".format(router)
+ errormsg = (
+ "TIMEOUT!! BGP is not converged in 30 seconds for"
+ " router {}".format(router)
+ )
return errormsg
logger.info(peer_uptime_before_clear_bgp)
@@ -942,13 +943,14 @@ def clear_bgp_and_verify(tgen, topo, router):
for retry in range(31):
sleeptime = 3
# Waiting for BGP to converge
- logger.info("Waiting for %s sec for BGP to converge on router"
- " %s...", sleeptime, router)
+ logger.info(
+ "Waiting for %s sec for BGP to converge on router" " %s...",
+ sleeptime,
+ router,
+ )
sleep(sleeptime)
-
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@@ -975,44 +977,46 @@ def clear_bgp_and_verify(tgen, topo, router):
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
- neighbor_ip = data[dest_link][addr_type].\
- split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"][
- "peers"]
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
- peer_uptime_after_clear_bgp[bgp_neighbor] = \
- ipv4_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
else:
- ipv6_data = show_bgp_json["ipv6Unicast"][
- "peers"]
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
# Peer up time dictionary
- peer_uptime_after_clear_bgp[bgp_neighbor] = \
- ipv6_data[neighbor_ip]["peerUptimeEstablishedEpoch"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
+ neighbor_ip
+ ]["peerUptimeEstablishedEpoch"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
- logger.info("BGP is Converged for router %s after bgp clear",
- router)
+ logger.info("BGP is Converged for router %s after bgp clear", router)
break
else:
- logger.info("BGP is not yet Converged for router %s after"
- " bgp clear", router)
+ logger.info(
+ "BGP is not yet Converged for router %s after" " bgp clear", router
+ )
else:
- errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \
- " router {}".format(router)
+ errormsg = (
+ "TIMEOUT!! BGP is not converged in 30 seconds for"
+ " router {}".format(router)
+ )
return errormsg
logger.info(peer_uptime_after_clear_bgp)
# Comparing peerUptimeEstablishedEpoch dictionaries
if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
- logger.info("BGP neighborship is reset after clear BGP on router %s",
- router)
+ logger.info("BGP neighborship is reset after clear BGP on router %s", router)
else:
- errormsg = "BGP neighborship is not reset after clear bgp on router" \
- " {}".format(router)
+ errormsg = (
+ "BGP neighborship is not reset after clear bgp on router"
+ " {}".format(router)
+ )
return errormsg
logger.debug("Exiting lib API: clear_bgp_and_verify()")
@@ -1060,11 +1064,11 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
rnode = router_list[router]
- logger.info("Verifying bgp timers functionality, DUT is %s:",
- router)
+ logger.info("Verifying bgp timers functionality, DUT is %s:", router)
- show_ip_bgp_neighbor_json = \
- run_frr_cmd(rnode, "show ip bgp neighbor json", isjson=True)
+ show_ip_bgp_neighbor_json = run_frr_cmd(
+ rnode, "show ip bgp neighbor json", isjson=True
+ )
bgp_addr_type = input_dict[router]["bgp"]["address_family"]
@@ -1072,8 +1076,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
if not check_address_types(addr_type):
continue
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
- "neighbor"]
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link, peer_dict in peer_data["dest_link"].iteritems():
data = topo["routers"][bgp_neighbor]["links"]
@@ -1082,32 +1085,41 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
holddowntimer = peer_dict["holddowntimer"]
if dest_link in data:
- neighbor_ip = data[dest_link][addr_type]. \
- split("/")[0]
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
neighbor_intf = data[dest_link]["interface"]
# Verify HoldDownTimer for neighbor
- bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[
- neighbor_ip]["bgpTimerHoldTimeMsecs"]
+ bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
+ "bgpTimerHoldTimeMsecs"
+ ]
if bgpHoldTimeMsecs != holddowntimer * 1000:
- errormsg = "Verifying holddowntimer for bgp " \
- "neighbor {} under dut {}, found: {} " \
- "but expected: {}".format(
- neighbor_ip, router,
- bgpHoldTimeMsecs,
- holddowntimer * 1000)
+ errormsg = (
+ "Verifying holddowntimer for bgp "
+ "neighbor {} under dut {}, found: {} "
+ "but expected: {}".format(
+ neighbor_ip,
+ router,
+ bgpHoldTimeMsecs,
+ holddowntimer * 1000,
+ )
+ )
return errormsg
# Verify KeepAliveTimer for neighbor
- bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[
- neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"]
+ bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
+ "bgpTimerKeepAliveIntervalMsecs"
+ ]
if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
- errormsg = "Verifying keepalivetimer for bgp " \
- "neighbor {} under dut {}, found: {} " \
- "but expected: {}".format(
- neighbor_ip, router,
- bgpKeepAliveTimeMsecs,
- keepalivetimer * 1000)
+ errormsg = (
+ "Verifying keepalivetimer for bgp "
+ "neighbor {} under dut {}, found: {} "
+ "but expected: {}".format(
+ neighbor_ip,
+ router,
+ bgpKeepAliveTimeMsecs,
+ keepalivetimer * 1000,
+ )
+ )
return errormsg
####################
@@ -1120,40 +1132,50 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
# Wait till keep alive time
logger.info("=" * 20)
logger.info("Scenario 1:")
- logger.info("Shutdown and bring up peer interface: %s "
- "in keep alive time : %s sec and verify "
- " BGP neighborship is intact in %s sec ",
- neighbor_intf, keepalivetimer,
- (holddowntimer - keepalivetimer))
+ logger.info(
+ "Shutdown and bring up peer interface: %s "
+ "in keep alive time : %s sec and verify "
+ " BGP neighborship is intact in %s sec ",
+ neighbor_intf,
+ keepalivetimer,
+ (holddowntimer - keepalivetimer),
+ )
logger.info("=" * 20)
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
# Shutting down peer ineterface
- logger.info("Shutting down interface %s on router %s",
- neighbor_intf, bgp_neighbor)
+ logger.info(
+ "Shutting down interface %s on router %s",
+ neighbor_intf,
+ bgp_neighbor,
+ )
topotest.interface_set_status(
- router_list[bgp_neighbor], neighbor_intf,
- ifaceaction=False)
+ router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
+ )
# Bringing up peer interface
sleep(5)
- logger.info("Bringing up interface %s on router %s..",
- neighbor_intf, bgp_neighbor)
+ logger.info(
+ "Bringing up interface %s on router %s..",
+ neighbor_intf,
+ bgp_neighbor,
+ )
topotest.interface_set_status(
- router_list[bgp_neighbor], neighbor_intf,
- ifaceaction=True)
+ router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
+ )
# Verifying BGP neighborship is intact in
# (holddown - keepalive) time
- for timer in range(keepalivetimer, holddowntimer,
- int(holddowntimer / 3)):
+ for timer in range(
+ keepalivetimer, holddowntimer, int(holddowntimer / 3)
+ ):
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
sleep(2)
- show_bgp_json = \
- run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(
+ rnode, "show bgp summary json", isjson=True
+ )
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
@@ -1162,17 +1184,22 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
- if timer == \
- (holddowntimer - keepalivetimer):
+ if timer == (holddowntimer - keepalivetimer):
if nh_state != "Established":
- errormsg = "BGP neighborship has not gone " \
- "down in {} sec for neighbor {}" \
- .format(timer, bgp_neighbor)
+ errormsg = (
+ "BGP neighborship has not gone "
+ "down in {} sec for neighbor {}".format(
+ timer, bgp_neighbor
+ )
+ )
return errormsg
else:
- logger.info("BGP neighborship is intact in %s"
- " sec for neighbor %s",
- timer, bgp_neighbor)
+ logger.info(
+ "BGP neighborship is intact in %s"
+ " sec for neighbor %s",
+ timer,
+ bgp_neighbor,
+ )
####################
# Shutting down peer interface and verifying that BGP
@@ -1180,27 +1207,36 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
####################
logger.info("=" * 20)
logger.info("Scenario 2:")
- logger.info("Shutdown peer interface: %s and verify BGP"
- " neighborship has gone down in hold down "
- "time %s sec", neighbor_intf, holddowntimer)
+ logger.info(
+ "Shutdown peer interface: %s and verify BGP"
+ " neighborship has gone down in hold down "
+ "time %s sec",
+ neighbor_intf,
+ holddowntimer,
+ )
logger.info("=" * 20)
- logger.info("Shutting down interface %s on router %s..",
- neighbor_intf, bgp_neighbor)
- topotest.interface_set_status(router_list[bgp_neighbor],
- neighbor_intf,
- ifaceaction=False)
+ logger.info(
+ "Shutting down interface %s on router %s..",
+ neighbor_intf,
+ bgp_neighbor,
+ )
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
+ )
# Verifying BGP neighborship is going down in holddown time
- for timer in range(keepalivetimer,
- (holddowntimer + keepalivetimer),
- int(holddowntimer / 3)):
+ for timer in range(
+ keepalivetimer,
+ (holddowntimer + keepalivetimer),
+ int(holddowntimer / 3),
+ ):
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
sleep(2)
- show_bgp_json = \
- run_frr_cmd(rnode, "show bgp summary json",
- isjson=True)
+ show_bgp_json = run_frr_cmd(
+ rnode, "show bgp summary json", isjson=True
+ )
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
@@ -1211,22 +1247,29 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
if timer == holddowntimer:
if nh_state == "Established":
- errormsg = "BGP neighborship has not gone " \
- "down in {} sec for neighbor {}" \
- .format(timer, bgp_neighbor)
+ errormsg = (
+ "BGP neighborship has not gone "
+ "down in {} sec for neighbor {}".format(
+ timer, bgp_neighbor
+ )
+ )
return errormsg
else:
- logger.info("BGP neighborship has gone down in"
- " %s sec for neighbor %s",
- timer, bgp_neighbor)
+ logger.info(
+ "BGP neighborship has gone down in"
+ " %s sec for neighbor %s",
+ timer,
+ bgp_neighbor,
+ )
logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()")
return True
@retry(attempts=3, wait=4, return_is_str=True)
-def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
- input_dict, seq_id=None):
+def verify_bgp_attributes(
+ tgen, addr_type, dut, static_routes, rmap_name, input_dict, seq_id=None
+):
"""
API will verify BGP attributes set by Route-map for given prefix and
DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
@@ -1288,7 +1331,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
if router != dut:
continue
- logger.info('Verifying BGP set attributes for dut {}:'.format(router))
+ logger.info("Verifying BGP set attributes for dut {}:".format(router))
for static_route in static_routes:
cmd = "show bgp {} {} json".format(addr_type, static_route)
@@ -1297,8 +1340,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
dict_to_test = []
tmp_list = []
for rmap_router in input_dict.keys():
- for rmap, values in input_dict[rmap_router][
- "route_maps"].items():
+ for rmap, values in input_dict[rmap_router]["route_maps"].items():
if rmap == rmap_name:
dict_to_test = values
for rmap_dict in values:
@@ -1307,8 +1349,7 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
seq_id = [seq_id]
if "seq_id" in rmap_dict:
- rmap_seq_id = \
- rmap_dict["seq_id"]
+ rmap_seq_id = rmap_dict["seq_id"]
for _seq_id in seq_id:
if _seq_id == rmap_seq_id:
tmp_list.append(rmap_dict)
@@ -1318,55 +1359,56 @@ def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
for rmap_dict in dict_to_test:
if "set" in rmap_dict:
for criteria in rmap_dict["set"].keys():
- if criteria not in show_bgp_json[
- "paths"][0]:
- errormsg = ("BGP attribute: {}"
- " is not found in"
- " cli: {} output "
- "in router {}".
- format(criteria,
- cmd,
- router))
+ if criteria not in show_bgp_json["paths"][0]:
+ errormsg = (
+ "BGP attribute: {}"
+ " is not found in"
+ " cli: {} output "
+ "in router {}".format(criteria, cmd, router)
+ )
return errormsg
- if rmap_dict["set"][criteria] == \
- show_bgp_json["paths"][0][
- criteria]:
- logger.info("Verifying BGP "
- "attribute {} for"
- " route: {} in "
- "router: {}, found"
- " expected value:"
- " {}".
- format(criteria,
- static_route,
- dut,
- rmap_dict[
- "set"][
- criteria]))
+ if (
+ rmap_dict["set"][criteria]
+ == show_bgp_json["paths"][0][criteria]
+ ):
+ logger.info(
+ "Verifying BGP "
+ "attribute {} for"
+ " route: {} in "
+ "router: {}, found"
+ " expected value:"
+ " {}".format(
+ criteria,
+ static_route,
+ dut,
+ rmap_dict["set"][criteria],
+ )
+ )
else:
- errormsg = \
- ("Failed: Verifying BGP "
- "attribute {} for route:"
- " {} in router: {}, "
- " expected value: {} but"
- " found: {}".
- format(criteria,
- static_route,
- dut,
- rmap_dict["set"]
- [criteria],
- show_bgp_json[
- 'paths'][
- 0][criteria]))
+ errormsg = (
+ "Failed: Verifying BGP "
+ "attribute {} for route:"
+ " {} in router: {}, "
+ " expected value: {} but"
+ " found: {}".format(
+ criteria,
+ static_route,
+ dut,
+ rmap_dict["set"][criteria],
+ show_bgp_json["paths"][0][criteria],
+ )
+ )
return errormsg
logger.debug("Exiting lib API: verify_bgp_attributes()")
return True
+
@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
-def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
- attribute):
+def verify_best_path_as_per_bgp_attribute(
+ tgen, addr_type, router, input_dict, attribute
+):
"""
API is to verify best path according to BGP attributes for given routes.
"show bgp ipv4/6 json" command will be run and verify best path according
@@ -1445,38 +1487,36 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
# AS_PATH attribute
if attribute == "path":
# Find next_hop for the route have minimum as_path
- _next_hop = min(attribute_dict, key=lambda x: len(set(
- attribute_dict[x])))
+ _next_hop = min(
+ attribute_dict, key=lambda x: len(set(attribute_dict[x]))
+ )
compare = "SHORTEST"
# LOCAL_PREF attribute
elif attribute == "locPrf":
# Find next_hop for the route have highest local preference
- _next_hop = max(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "HIGHEST"
# WEIGHT attribute
elif attribute == "weight":
# Find next_hop for the route have highest weight
- _next_hop = max(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "HIGHEST"
# ORIGIN attribute
elif attribute == "origin":
# Find next_hop for the route have IGP as origin, -
# - rule is IGP>EGP>INCOMPLETE
- _next_hop = [key for (key, value) in
- attribute_dict.iteritems()
- if value == "IGP"][0]
+ _next_hop = [
+ key for (key, value) in attribute_dict.iteritems() if value == "IGP"
+ ][0]
compare = ""
# MED attribute
elif attribute == "metric":
# Find next_hop for the route have LOWEST MED
- _next_hop = min(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "LOWEST"
# Show ip route
@@ -1489,8 +1529,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
# Verifying output dictionary rib_routes_json is not empty
if not bool(rib_routes_json):
- errormsg = "No route found in RIB of router {}..". \
- format(router)
+ errormsg = "No route found in RIB of router {}..".format(router)
return errormsg
st_found = False
@@ -1499,31 +1538,41 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
if route in rib_routes_json:
st_found = True
# Verify next_hop in rib_routes_json
- if rib_routes_json[route][0]["nexthops"][0]["ip"] in \
- attribute_dict:
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict:
nh_found = True
else:
- errormsg = "Incorrect Nexthop for BGP route {} in " \
- "RIB of router {}, Expected: {}, Found:" \
- " {}\n".format(route, router,
- rib_routes_json[route][0][
- "nexthops"][0]["ip"],
- _next_hop)
+ errormsg = (
+ "Incorrect Nexthop for BGP route {} in "
+ "RIB of router {}, Expected: {}, Found:"
+ " {}\n".format(
+ route,
+ router,
+ rib_routes_json[route][0]["nexthops"][0]["ip"],
+ _next_hop,
+ )
+ )
return errormsg
if st_found and nh_found:
logger.info(
"Best path for prefix: %s with next_hop: %s is "
"installed according to %s %s: (%s) in RIB of "
- "router %s", route, _next_hop, compare,
- attribute, attribute_dict[_next_hop], router)
+ "router %s",
+ route,
+ _next_hop,
+ compare,
+ attribute,
+ attribute_dict[_next_hop],
+ router,
+ )
logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()")
return True
-def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
- attribute):
+def verify_best_path_as_per_admin_distance(
+ tgen, addr_type, router, input_dict, attribute
+):
"""
API is to verify best path according to admin distance for given
route. "show ip/ipv6 route json" command will be run and verify
@@ -1574,7 +1623,8 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
for routes_from_router in input_dict.keys():
sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
- command, isjson=True)
+ command, isjson=True
+ )
networks = input_dict[routes_from_router]["static_routes"]
for network in networks:
route = network["network"]
@@ -1590,8 +1640,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
attribute_dict[next_hop_ip] = route_attribute["distance"]
# Find next_hop for the route have LOWEST Admin Distance
- _next_hop = min(attribute_dict, key=(lambda k:
- attribute_dict[k]))
+ _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "LOWEST"
# Show ip route
@@ -1608,21 +1657,25 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
if route in rib_routes_json:
st_found = True
# Verify next_hop in rib_routes_json
- if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
- _next_hop:
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop:
nh_found = True
else:
- errormsg = ("Nexthop {} is Missing for BGP route {}"
- " in RIB of router {}\n".format(_next_hop,
- route, router))
+ errormsg = (
+ "Nexthop {} is Missing for BGP route {}"
+ " in RIB of router {}\n".format(_next_hop, route, router)
+ )
return errormsg
if st_found and nh_found:
- logger.info("Best path for prefix: %s is installed according"
- " to %s %s: (%s) in RIB of router %s", route,
- compare, attribute,
- attribute_dict[_next_hop], router)
-
- logger.info(
- "Exiting lib API: verify_best_path_as_per_admin_distance()")
+ logger.info(
+ "Best path for prefix: %s is installed according"
+ " to %s %s: (%s) in RIB of router %s",
+ route,
+ compare,
+ attribute,
+ attribute_dict[_next_hop],
+ router,
+ )
+
+ logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()")
return True