from time import sleep
import traceback
import ipaddr
+import os
from lib import topotest
-
from lib.topolog import logger
# Import common_config to use commomnly used APIs
generate_ips,
validate_ip_address,
find_interface_with_greater_ip,
- run_frr_cmd,
+ run_frr_cmd, FRRCFG_FILE,
retry,
)
-BGP_CONVERGENCE_TIMEOUT = 10
-
+LOGDIR = "/tmp/topotests/"
+TMPDIR = None
-def create_router_bgp(tgen, topo, input_dict=None, build=False):
+def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True):
"""
API to configure bgp on router
"bgp": {
"local_as": "200",
"router_id": "22.22.22.22",
+ "graceful-restart": {
+ "graceful-restart": True,
+ "preserve-fw-state": True,
+ "timer": {
+ "restart-time": 300,
+ "rib-stale-time": 300,
+ "select-defer-time": 300,
+ }
+ },
"address_family": {
"ipv4": {
"unicast": {
"allowas-in": {
"number_occurences":2
},
+ "graceful-restart": True",
"prefix_lists": [
{
"name": "pf_list_1",
"""
logger.debug("Entering lib API: create_router_bgp()")
result = False
+
+ # Flag is used when testing ipv6 over ipv4 or vice-versa
+ afi_test = False
+
if not input_dict:
input_dict = deepcopy(topo)
else:
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
- tgen, topo, input_dict, router, config_data=data_all_bgp
+ tgen, topo, input_dict, router, afi_test,
+ config_data=data_all_bgp
)
try:
result = create_common_configuration(
- tgen, router, data_all_bgp, "bgp", build
+ tgen, router, data_all_bgp, "bgp", build, load_config
)
except InvalidCLIError:
# Traceback
config_data.append("bgp router-id {}".format(router_id))
config_data.append("no bgp network import-check")
+
+ if "graceful-restart" in bgp_data:
+ graceful_config = bgp_data["graceful-restart"]
+
+ graceful_restart = graceful_config.setdefault("graceful-restart", None)
+
+ graceful_restart_disable = graceful_config.setdefault(
+ "graceful-restart-disable", None
+ )
+
+ preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None)
+
+ disable_eor = graceful_config.setdefault("disable-eor", None)
+
+ if graceful_restart == False:
+ cmd = "no bgp graceful-restart"
+ if graceful_restart:
+ cmd = "bgp graceful-restart"
+
+ if graceful_restart is not None:
+ config_data.append(cmd)
+
+ if graceful_restart_disable == False:
+ cmd = "no bgp graceful-restart-disable"
+ if graceful_restart_disable:
+ cmd = "bgp graceful-restart-disable"
+
+ if graceful_restart_disable is not None:
+ config_data.append(cmd)
+
+ if preserve_fw_state == False:
+ cmd = "no bgp graceful-restart preserve-fw-state"
+ if preserve_fw_state:
+ cmd = "bgp graceful-restart preserve-fw-state"
+
+ if preserve_fw_state is not None:
+ config_data.append(cmd)
+
+ if disable_eor == False:
+ cmd = "no bgp graceful-restart disable-eor"
+ if disable_eor:
+ cmd = "bgp graceful-restart disable-eor"
+
+ if disable_eor is not None:
+ config_data.append(cmd)
+
+ if "timer" in bgp_data["graceful-restart"]:
+ timer = bgp_data["graceful-restart"]["timer"]
+
+ if "delete" in timer:
+ del_action = timer["delete"]
+ else:
+ del_action = False
+
+ for rs_timer, value in timer.items():
+ rs_timer_value = timer.setdefault(rs_timer, None)
+
+ if rs_timer_value and rs_timer != "delete":
+ cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ logger.debug("Exiting lib API: create_bgp_global()")
return config_data
-def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, config_data=None):
+def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test,
+ config_data=None):
"""
Helper API to create configuration for address-family unicast
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `router` : router id to be configured.
+ * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
* `build` : Only for initial setup phase this is set as True.
"""
if not addr_dict:
continue
- if not check_address_types(addr_type):
+ if not check_address_types(addr_type) and not afi_test:
continue
addr_data = addr_dict["unicast"]
hold_down = peer.setdefault("holddowntimer", 180)
password = peer.setdefault("password", None)
max_hop_limit = peer.setdefault("ebgp_multihop", 1)
+ graceful_restart = peer.setdefault("graceful-restart", None)
+ graceful_restart_helper = peer.setdefault("graceful-restart-helper", None)
+ graceful_restart_disable = peer.setdefault("graceful-restart-disable", None)
if update_source:
config_data.append(
config_data.append(
"{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
)
+
+ if graceful_restart:
+ config_data.append("{} graceful-restart".format(neigh_cxt))
+ elif graceful_restart == False:
+ config_data.append("no {} graceful-restart".format(neigh_cxt))
+
+ if graceful_restart_helper:
+ config_data.append("{} graceful-restart-helper".format(neigh_cxt))
+ elif graceful_restart_helper == False:
+ config_data.append("no {} graceful-restart-helper".format(neigh_cxt))
+
+ if graceful_restart_disable:
+ config_data.append("{} graceful-restart-disable".format(neigh_cxt))
+ elif graceful_restart_disable == False:
+ config_data.append("no {} graceful-restart-disable".format(neigh_cxt))
+
if password:
config_data.append("{} password {}".format(neigh_cxt, password))
for peer_name, peer_dict in deepcopy(neigh_data).iteritems():
for dest_link, peer in peer_dict["dest_link"].iteritems():
deactivate = None
+ activate = None
nh_details = topo[peer_name]
+ activate_addr_family = peer.setdefault("activate", None)
+ deactivate_addr_family = peer.setdefault("deactivate", None)
# Loopback interface
if "source_link" in peer and peer["source_link"] == "lo":
for destRouterLink, data in sorted(nh_details["links"].iteritems()):
]
neigh_cxt = "neighbor {}".format(ip_addr)
- config_data.append("address-family {} unicast".format(addr_type))
- if deactivate:
- config_data.append("no neighbor {} activate".format(deactivate))
+ config_data.append("address-family {} unicast".format(
+ addr_type
+ ))
+
+ if activate_addr_family is not None:
+ config_data.append("address-family {} unicast".format(
+ activate_addr_family))
+
+ config_data.append(
+ "{} activate".format(neigh_cxt))
+
+ if deactivate and activate_addr_family is None:
+ config_data.append(
+ "no neighbor {} activate".format(deactivate))
+
+ if deactivate_addr_family is not None:
+ config_data.append("address-family {} unicast".format(
+ deactivate_addr_family))
+ config_data.append(
+ "no {} activate".format(neigh_cxt))
next_hop_self = peer.setdefault("next_hop_self", None)
send_community = peer.setdefault("send_community", None)
return config_data
+def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
+ """
+ API will save the current config to router's /etc/frr/ for BGPd
+ daemon(bgpd.conf file)
+
+ Paramters
+ ---------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : defines for which router, and which config
+ needs to be modified
+
+ Usage:
+ ------
+ # Modify graceful-restart config not to set f-bit
+ # and write to /etc/frr
+
+ # Api call to delete advertised networks
+ input_dict_2 = {
+ "r5": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "101.0.20.1/32",
+ "no_of_network": 5,
+ "delete": True
+ }
+ ],
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "5::1/128",
+ "no_of_network": 5,
+ "delete": True
+ }
+ ],
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
+
+ """
+
+ logger.debug("Entering lib API: modify_bgp_config_when_bgpd_down()")
+ try:
+
+ global LOGDIR
+
+ result = create_router_bgp(
+ tgen, topo, input_dict, build=False, load_config=False
+ )
+ if result is not True:
+ return result
+
+ # Copy bgp config file to /etc/frr
+ for dut in input_dict.keys():
+ router_list = tgen.routers()
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ TMPDIR = os.path.join(LOGDIR, tgen.modname)
+
+ logger.info("Delete BGP config when BGPd is down in {}".format(router))
+ # Reading the config from /tmp/topotests and
+ # copy to /etc/frr/bgpd.conf
+ cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
+ TMPDIR, router, FRRCFG_FILE
+ )
+ router_list[router].run(cmd)
+
+ except Exception as e:
+ # handle any exception
+ logger.error("Error %s occured. Arguments %s.", e.message, e.args)
+
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: modify_bgp_config_when_bgpd_down")
+ return True
+
+
#############################################
# Verification APIs
#############################################
return True
-@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
-def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
+def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer):
"""
- This API is to verify whether bgp rib has any
- matching route for a nexthop.
+ This API is to verify verify_graceful_restart configuration of DUT and
+ cross verify the same from the peer bgp routerrouter.
Parameters
----------
* `tgen`: topogen object
- * `dut`: input dut router name
+ * `topo`: input json file data
* `addr_type` : ip type ipv4/ipv6
- * `input_dict` : input dict, has details of static routes
- * `next_hop`[optional]: next_hop which needs to be verified,
- default = static
- * 'aspath'[optional]: aspath which needs to be verified
+ * `input_dict`: input dictionary, have details of Device Under Test, for
+ which user wants to test the data
+ * `dut`: input dut router name
+ * `peer`: input peer router name
Usage
-----
- dut = 'r1'
- next_hop = "192.168.1.10"
- input_dict = topo['routers']
- aspath = "100 200 300"
- result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
- next_hop, aspath)
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
+ dut = "r1", peer = 'r2')
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_bgp_rib()")
+ logger.debug("Entering lib API: verify_graceful_restart()")
- router_list = tgen.routers()
- additional_nexthops_in_required_nhs = []
- list1 = []
- list2 = []
- for routerInput in input_dict.keys():
- for router, rnode in router_list.iteritems():
- if router != dut:
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
+
+ if addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
continue
- # Verifying RIB routes
- command = "show bgp"
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
- # Static routes
- sleep(2)
- logger.info("Checking router {} BGP RIB:".format(dut))
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ if bgp_neighbor != peer:
+ continue
- if "static_routes" in input_dict[routerInput]:
- static_routes = input_dict[routerInput]["static_routes"]
+ for dest_link, peer_dict in peer_data["dest_link"].items():
+ data = topo["routers"][bgp_neighbor]["links"]
- for static_route in static_routes:
- found_routes = []
- missing_routes = []
- st_found = False
- nh_found = False
- vrf = static_route.setdefault("vrf", None)
- if vrf:
- cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
- else:
- cmd = "{} {}".format(command, addr_type)
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart show"
+ " o/p {}".format(dut, neighbor_ip)
+ )
- cmd = "{} json".format(cmd)
+ show_bgp_graceful_json = None
- rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
- # Verifying output dictionary rib_routes_json is not empty
- if bool(rib_routes_json) == False:
- errormsg = "No route found in rib of router {}..".format(router)
- return errormsg
+ logger.info("show_bgp_graceful_json {}".format(show_bgp_graceful_json))
- network = static_route["network"]
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
- if "no_of_ip" in static_route:
- no_of_ip = static_route["no_of_ip"]
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info(
+ "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
+ )
+ else:
+ errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
+ dut, neighbor_ip
+ )
+ return errormsg
+
+ lmode = None
+ rmode = None
+ # Local GR mode
+ if "address_family" in input_dict[dut]["bgp"]:
+ bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]["neighbor"][peer]["dest_link"]
+
+ for dest_link, data in bgp_neighbors.items():
+ if (
+ "graceful-restart-helper" in data
+ and data["graceful-restart-helper"]
+ ):
+ lmode = "Helper"
+ elif "graceful-restart" in data and data["graceful-restart"]:
+ lmode = "Restart"
+ elif (
+ "graceful-restart-disable" in data
+ and data["graceful-restart-disable"]
+ ):
+ lmode = "Disable"
else:
- no_of_ip = 1
+ lmode = None
- # Generating IPs for verification
- ip_list = generate_ips(network, no_of_ip)
+ if lmode is None:
+ if "graceful-restart" in input_dict[dut]["bgp"]:
- for st_rt in ip_list:
- st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+ if (
+ "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
+ and input_dict[dut]["bgp"]["graceful-restart"][
+ "graceful-restart"
+ ]
+ ):
+ lmode = "Restart*"
+ elif (
+ "graceful-restart-disable"
+ in input_dict[dut]["bgp"]["graceful-restart"]
+ and input_dict[dut]["bgp"]["graceful-restart"][
+ "graceful-restart-disable"
+ ]
+ ):
+ lmode = "Disable*"
+ else:
+ lmode = "Helper*"
+ else:
+ lmode = "Helper*"
+
+ if lmode == "Disable" or lmode == "Disable*":
+ return True
+
+ # Remote GR mode
+ if "address_family" in input_dict[peer]["bgp"]:
+ bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]["neighbor"][dut]["dest_link"]
+
+ for dest_link, data in bgp_neighbors.items():
+ if (
+ "graceful-restart-helper" in data
+ and data["graceful-restart-helper"]
+ ):
+ rmode = "Helper"
+ elif "graceful-restart" in data and data["graceful-restart"]:
+ rmode = "Restart"
+ elif (
+ "graceful-restart-disable" in data
+ and data["graceful-restart-disable"]
+ ):
+ rmode = "Disable"
+ else:
+ rmode = None
- _addr_type = validate_ip_address(st_rt)
- if _addr_type != addr_type:
- continue
+ if rmode is None:
+ if "graceful-restart" in input_dict[peer]["bgp"]:
- if st_rt in rib_routes_json["routes"]:
- st_found = True
- found_routes.append(st_rt)
+ if (
+ "graceful-restart"
+ in input_dict[peer]["bgp"]["graceful-restart"]
+ and input_dict[peer]["bgp"]["graceful-restart"][
+ "graceful-restart"
+ ]
+ ):
+ rmode = "Restart"
+ elif (
+ "graceful-restart-disable"
+ in input_dict[peer]["bgp"]["graceful-restart"]
+ and input_dict[peer]["bgp"]["graceful-restart"][
+ "graceful-restart-disable"
+ ]
+ ):
+ rmode = "Disable"
+ else:
+ rmode = "Helper"
+ else:
+ rmode = "Helper"
- if next_hop:
- if not isinstance(next_hop, list):
- next_hop = [next_hop]
- list1 = next_hop
- found_hops = [
- rib_r["ip"]
- for rib_r in rib_routes_json["routes"][st_rt][0][
- "nexthops"
- ]
- ]
- list2 = found_hops
- missing_list_of_nexthops = set(list2).difference(list1)
- additional_nexthops_in_required_nhs = set(
- list1
- ).difference(list2)
+ if show_bgp_graceful_json_out["localGrMode"] == lmode:
+ logger.info(
+ "[DUT: {}]: localGrMode : {} ".format(
+ dut, show_bgp_graceful_json_out["localGrMode"]
+ )
+ )
+ else:
+ errormsg = (
+ "[DUT: {}]: localGrMode is not correct"
+ " Expected: {}, Found: {}".format(
+ dut, lmode, show_bgp_graceful_json_out["localGrMode"]
+ )
+ )
+ return errormsg
- if list2:
- if additional_nexthops_in_required_nhs:
- logger.info(
- "Missing nexthop %s for route"
- " %s in RIB of router %s\n",
- additional_nexthops_in_required_nhs,
- st_rt,
- dut,
- )
- errormsg = (
- "Nexthop {} is Missing for "
- "route {} in RIB of router {}\n".format(
- additional_nexthops_in_required_nhs,
- st_rt,
- dut,
- )
- )
- return errormsg
- else:
- nh_found = True
- if aspath:
- found_paths = rib_routes_json["routes"][st_rt][0][
- "path"
- ]
- if aspath == found_paths:
- aspath_found = True
- logger.info(
- "Found AS path {} for route"
- " {} in RIB of router "
- "{}\n".format(aspath, st_rt, dut)
- )
- else:
- errormsg = (
- "AS Path {} is missing for route"
- "for route {} in RIB of router {}\n".format(
- aspath, st_rt, dut
- )
- )
- return errormsg
+ if show_bgp_graceful_json_out["remoteGrMode"] == rmode:
+ logger.info(
+ "[DUT: {}]: remoteGrMode : {} ".format(
+ dut, show_bgp_graceful_json_out["remoteGrMode"]
+ )
+ )
+ elif (
+ show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable"
+ and rmode == "Disable"
+ ):
+ logger.info(
+ "[DUT: {}]: remoteGrMode : {} ".format(
+ dut, show_bgp_graceful_json_out["remoteGrMode"]
+ )
+ )
+ else:
+ errormsg = (
+ "[DUT: {}]: remoteGrMode is not correct"
+ " Expected: {}, Found: {}".format(
+ dut, rmode, show_bgp_graceful_json_out["remoteGrMode"]
+ )
+ )
+ return errormsg
- else:
- missing_routes.append(st_rt)
+ logger.debug("Exiting lib API: verify_graceful_restart()")
+ return True
- if nh_found:
+
+@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
+def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer):
+ """
+ This API is to verify r_bit in the BGP gr capability advertised
+ by the neighbor router
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict`: input dictionary, have details of Device Under Test, for
+ which user wants to test the data
+ * `dut`: input dut router name
+ * `peer`: peer name
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_r_bit()")
+
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ if addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ if bgp_neighbor != peer:
+ continue
+
+ for dest_link, peer_dict in peer_data["dest_link"].items():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
+
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart show"
+ " o/p {}".format(dut, neighbor_ip)
+ )
+
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
+
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info(
+ "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
+ )
+ else:
+ errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
+ dut, neighbor_ip
+ )
+ return errormsg
+
+ if "rBit" in show_bgp_graceful_json_out:
+ if show_bgp_graceful_json_out["rBit"]:
+ logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip))
+ else:
+ errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
+ return errormsg
+
+ logger.debug("Exiting lib API: verify_r_bit()")
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_eor(tgen, topo, addr_type, input_dict, dut, peer):
+ """
+ This API is to verify EOR
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict`: input dictionary, have details of DUT, for
+ which user wants to test the data
+ * `dut`: input dut router name
+ * `peer`: peer name
+ Usage
+ -----
+ input_dict = {
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ logger.debug("Entering lib API: verify_eor()")
+
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ if addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ if bgp_neighbor != peer:
+ continue
+
+ for dest_link, peer_dict in peer_data["dest_link"].items():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
+
+ logger.info(
+ "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
+ dut,
+ neighbor_ip,
+ )
+
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
+
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info("[DUT: %s]: Neighbor ip matched %s", dut, neighbor_ip)
+ else:
+ errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % (
+ dut,
+ neighbor_ip,
+ )
+ return errormsg
+
+ for afi, afi_data in show_bgp_graceful_json_out.items():
+ if "v4" not in afi and "v6" not in afi:
+ continue
+
+ eor_json = afi_data["endOfRibStatus"]
+ if "endOfRibSend" in eor_json:
+
+ if eor_json["endOfRibSend"]:
logger.info(
- "Found next_hop {} for all bgp"
- " routes in RIB of"
- " router {}\n".format(next_hop, router)
+ "[DUT: %s]: EOR Send true for %s " "%s",
+ dut,
+ neighbor_ip,
+ afi,
)
+ else:
+ errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
+ dut,
+ neighbor_ip,
+ afi,
+ )
+ return errormsg
- if len(missing_routes) > 0:
- errormsg = (
- "Missing route in RIB of router {}, "
- "routes: {}\n".format(dut, missing_routes)
+ if "endOfRibRecv" in eor_json:
+ if eor_json["endOfRibRecv"]:
+ logger.info(
+ "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
+ )
+ else:
+ errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
+ dut,
+ neighbor_ip,
+ afi,
)
return errormsg
- if found_routes:
+ if "endOfRibSentAfterUpdate" in eor_json:
+ if eor_json["endOfRibSentAfterUpdate"]:
logger.info(
- "Verified routes in router {} BGP RIB, "
- "found routes are: {} \n".format(dut, found_routes)
+ "[DUT: %s]: EOR SendTime true for %s" " %s",
+ dut,
+ neighbor_ip,
+ afi,
)
- continue
+ else:
+ errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
+ dut,
+ neighbor_ip,
+ afi,
+ )
+ return errormsg
- if "bgp" not in input_dict[routerInput]:
+ logger.debug("Exiting lib API: verify_eor()")
+ return True
+
+
+@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
+def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer):
+ """
+ This API is to verify f_bit in the BGP gr capability advertised
+ by the neighbor router
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict`: input dictionary, have details of Device Under Test, for
+ which user wants to test the data
+ * `dut`: input dut router name
+ * `peer`: peer name
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link":{
+ "r1": {
+ "graceful-restart": True
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_f_bit()")
+
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ if addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
continue
- # Advertise networks
- bgp_data_list = input_dict[routerInput]["bgp"]
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
- if type(bgp_data_list) is not list:
- bgp_data_list = [bgp_data_list]
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ if bgp_neighbor != peer:
+ continue
- for bgp_data in bgp_data_list:
- vrf_id = bgp_data.setdefault("vrf", None)
- if vrf_id:
- cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
- else:
- cmd = "{} {}".format(command, addr_type)
+ for dest_link, peer_dict in peer_data["dest_link"].items():
+ data = topo["routers"][bgp_neighbor]["links"]
- cmd = "{} json".format(cmd)
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
- rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart show"
+ " o/p {}".format(dut, neighbor_ip)
+ )
- # Verifying output dictionary rib_routes_json is not empty
- if bool(rib_routes_json) == False:
- errormsg = "No route found in rib of router {}..".format(router)
- return errormsg
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
- bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
- advertise_network = bgp_net_advertise.setdefault(
- "advertise_networks", []
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info(
+ "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
)
+ else:
+ errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format(
+ dut, neighbor_ip
+ )
+ return errormsg
- for advertise_network_dict in advertise_network:
- found_routes = []
- missing_routes = []
- found = False
+ for afi, afi_data in show_bgp_graceful_json_out.items():
+ if "v4" not in afi and "v6" not in afi:
+ continue
- network = advertise_network_dict["network"]
+ if afi_data["fBit"]:
+ logger.info(
+ "[DUT: {}]: Fbit True for {} {}".format(dut, neighbor_ip, afi)
+ )
+ else:
+ errormsg = "[DUT: {}]: Fbit False for {} {}".format(
+ dut, neighbor_ip, afi
+ )
+ return errormsg
- if "no_of_network" in advertise_network_dict:
- no_of_network = advertise_network_dict["no_of_network"]
- else:
- no_of_network = 1
+ logger.debug("Exiting lib API: verify_f_bit()")
+ return True
- # Generating IPs for verification
- ip_list = generate_ips(network, no_of_network)
- for st_rt in ip_list:
- st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
+def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer):
+ """
+ This API is to verify graceful restart timers, configured and recieved
- _addr_type = validate_ip_address(st_rt)
- if _addr_type != addr_type:
- continue
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict`: input dictionary, have details of Device Under Test,
+ for which user wants to test the data
+ * `dut`: input dut router name
+ * `peer`: peer name
+ Usage
+ -----
+ # Configure graceful-restart
+ input_dict_1 = {
+ "r1": {
+ "bgp": {
+ "bgp_neighbors": {
+ "r3": {
+ "graceful-restart": "graceful-restart-helper"
+ }
+ },
+ "gracefulrestart": ["restart-time 150"]
+ }
+ },
+ "r3": {
+ "bgp": {
+ "bgp_neighbors": {
+ "r1": {
+ "graceful-restart": "graceful-restart"
+ }
+ }
+ }
+ }
+ }
- if st_rt in rib_routes_json["routes"]:
- found = True
- found_routes.append(st_rt)
- else:
- found = False
- missing_routes.append(st_rt)
+ result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
- if len(missing_routes) > 0:
- errormsg = (
- "Missing route in BGP RIB of router {},"
- " are: {}\n".format(dut, missing_routes)
- )
- return errormsg
+ Returns
+ -------
+ errormsg(str) or True
+ """
- if found_routes:
- logger.info(
- "Verified routes in router {} BGP RIB, found "
- "routes are: {}\n".format(dut, found_routes)
- )
+ logger.debug("Entering lib API: verify_graceful_restart_timers()")
- logger.debug("Exiting lib API: verify_bgp_rib()")
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
+
+ if addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ if bgp_neighbor != peer:
+ continue
+
+ for dest_link, peer_dict in peer_data["dest_link"].items():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
+
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart show"
+ " o/p {}".format(dut, neighbor_ip)
+ )
+
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
+
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info(
+ "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
+ )
+ else:
+ errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format(
+ dut, neighbor_ip
+ )
+ return errormsg
+
+ # Graceful-restart timer
+ if "graceful-restart" in input_dict[peer]["bgp"]:
+ if "timer" in input_dict[peer]["bgp"]["graceful-restart"]:
+ for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][
+ "timer"
+ ].items():
+ if rs_timer == "restart-time":
+
+ receivedTimer = value
+ if (
+ show_bgp_graceful_json_out["timers"][
+ "receivedRestartTimer"
+ ]
+ == receivedTimer
+ ):
+ logger.info(
+ "receivedRestartTimer is {}"
+ " on {} from peer {}".format(
+ receivedTimer, router, peer
+ )
+ )
+ else:
+ errormsg = (
+ "receivedRestartTimer is not"
+ " as expected {}".format(receivedTimer)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: verify_graceful_restart_timers")
return True
+
+
+@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
+def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut):
+ """
+ This API is to verify gr_address_family in the BGP gr capability advertised
+ by the neighbor router
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type ipv4/ipv6
+ * `addr_family` : address family type IPV4 Unicast/IPV6 Unicast
+ * `dut`: input dut router name
+
+ Usage
+ -----
+ result = verify_gr_address_family(tgen, topo, addr_type, addr_family,
+ , dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_gr_address_family()")
+
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ if addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ for dest_link, peer_dict in peer_data["dest_link"].items():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
+
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart"
+ " show o/p {}".format(dut, neighbor_ip)
+ )
+
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
+
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info("Neighbor ip matched {}".format(neighbor_ip))
+ else:
+ errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
+ return errormsg
+
+ if "v4" in addr_family:
+ input_afi = "v4"
+ elif "v6" in addr_family:
+ input_afi = "v6"
+
+ for afi in show_bgp_graceful_json_out.keys():
+ if input_afi not in afi:
+ continue
+ else:
+ logger.info("{} present for {} ".format(addr_family, neighbor_ip))
+ return True
+ else:
+ errormsg = "{} NOT present for {} ".format(addr_family, neighbor_ip)
+ return errormsg
+
+ logger.debug("Exiting lib API: verify_gr_address_family()")