@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
-def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None,
-aspath=None, multi_nh=None):
+def verify_bgp_rib(
+ tgen, addr_type, dut, input_dict, next_hop=None, aspath=None, multi_nh=None
+):
"""
This API is to verify whether bgp rib has any
matching route for a nexthop.
if not isinstance(next_hop, list):
next_hop = [next_hop]
list1 = next_hop
- found_hops = [rib_r["ip"] for rib_r in
- rib_routes_json["routes"][
- st_rt][0]["nexthops"]]
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
list2 = found_hops
- missing_list_of_nexthops = \
- set(list2).difference(list1)
- additional_nexthops_in_required_nhs = \
- set(list1).difference(list2)
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
if list2:
if additional_nexthops_in_required_nhs:
- logger.info("Missing nexthop %s for route"\
- " %s in RIB of router %s\n", \
- additional_nexthops_in_required_nhs, \
- st_rt, dut)
- errormsg=("Nexthop {} is Missing for "\
- "route {} in RIB of router {}\n".format(
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut))
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
return errormsg
if rt == "auto":
+ vni_dict = {}
logger.info(
"[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
dut,
)
if rt_peer:
- vni_dict = {}
-
rnode = tgen.routers()[rt_peer]
show_bgp_json = run_frr_cmd(
rnode, "show bgp vrf all summary json", isjson=True
router_list = tgen.routers()
# Start daemons
- result = router_list[router].startDaemons(daemons)
- return result
+ res = router_list[router].startDaemons(daemons)
except Exception as e:
errormsg = traceback.format_exc()
logger.error(errormsg)
- return errormsg
+ res = errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return True
+ return res
def kill_mininet_routers_process(tgen):
return ipaddress_list
start_ip = ipaddress.IPv4Address(unicode(start_ip))
step = 2 ** (32 - mask)
- if addr_type == "ipv6":
+ elif addr_type == "ipv6":
if start_ip == "0::0" and mask == 0 and no_of_ips == 1:
ipaddress_list.append("{}/{}".format(start_ip, mask))
return ipaddress_list
start_ip = ipaddress.IPv6Address(unicode(start_ip))
step = 2 ** (128 - mask)
+ else:
+ return []
next_ip = start_ip
count = 0