summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py287
1 files changed, 26 insertions, 261 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index d83f946c42..6c24b6ddbb 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -1150,6 +1150,9 @@ def generate_ips(network, no_of_ips):
if "/" in start_ipaddr:
start_ip = start_ipaddr.split("/")[0]
mask = int(start_ipaddr.split("/")[1])
+ else:
+ logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
+ assert(0)
addr_type = validate_ip_address(start_ip)
if addr_type == "ipv4":
@@ -2559,6 +2562,7 @@ def verify_rib(
tag=None,
metric=None,
fib=None,
+ count_only=False
):
"""
Data will be read from input_dict or input JSON file, API will generate
@@ -2576,6 +2580,8 @@ def verify_rib(
* `next_hop`[optional]: next_hop which needs to be verified,
default: static
* `protocol`[optional]: protocol, default = None
+ * `count_only`[optional]: count of nexthops only, not specific addresses,
+ default = False
Usage
-----
@@ -2739,7 +2745,23 @@ def verify_rib(
for rib_r in rib_routes_json[st_rt][0]["nexthops"]
]
- if found_hops:
+ # Check only the count of nexthops
+ if count_only:
+ if len(next_hop) == len(found_hops):
+ nh_found = True
+ else:
+ errormsg = (
+ "Nexthops are missing for "
+ "route {} in RIB of router {}: "
+ "expected {}, found {}\n".format(
+ st_rt, dut, len(next_hop),
+ len(found_hops)
+ )
+ )
+ return errormsg
+
+ # Check the actual nexthops
+ elif found_hops:
missing_list_of_nexthops = set(
found_hops
).difference(next_hop)
@@ -2846,7 +2868,7 @@ def verify_rib(
for advertise_network_dict in advertise_network:
if "vrf" in advertise_network_dict:
- cmd = "{} vrf {} json".format(command, static_route["vrf"])
+ cmd = "{} vrf {} json".format(command, advertise_network_dict["vrf"])
else:
cmd = "{} json".format(command)
@@ -2925,264 +2947,7 @@ def verify_rib(
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
-
-@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
-def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
- """
- Data will be read from input_dict or input JSON file, API will generate
- same prefixes, which were redistributed by either create_static_routes() or
- advertise_networks_using_network_command() and will verify next_hop and
- each prefix/routes is present in "show ip/ipv6 fib json"
- command o/p.
-
- Parameters
- ----------
- * `tgen` : topogen object
- * `addr_type` : ip type, ipv4/ipv6
- * `dut`: Device Under Test, for which user wants to test the data
- * `input_dict` : input dict, has details of static routes
- * `next_hop`[optional]: next_hop which needs to be verified,
- default: static
-
- Usage
- -----
- input_routes_r1 = {
- "r1": {
- "static_routes": [{
- "network": ["1.1.1.1/32],
- "next_hop": "Null0",
- "vrf": "RED"
- }]
- }
- }
- result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
-
- Returns
- -------
- errormsg(str) or True
- """
-
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
-
- router_list = tgen.routers()
- for routerInput in input_dict.keys():
- for router, rnode in router_list.items():
- if router != dut:
- continue
-
- logger.info("Checking router %s FIB routes:", router)
-
- # Verifying RIB routes
- if addr_type == "ipv4":
- command = "show ip fib"
- else:
- command = "show ipv6 fib"
-
- found_routes = []
- missing_routes = []
-
- if "static_routes" in input_dict[routerInput]:
- static_routes = input_dict[routerInput]["static_routes"]
-
- for static_route in static_routes:
- if "vrf" in static_route and static_route["vrf"] is not None:
-
- logger.info(
- "[DUT: {}]: Verifying routes for VRF:"
- " {}".format(router, static_route["vrf"])
- )
-
- cmd = "{} vrf {}".format(command, static_route["vrf"])
-
- else:
- cmd = "{}".format(command)
-
- cmd = "{} json".format(cmd)
-
- rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
-
- # Verifying output dictionary rib_routes_json is not empty
- if bool(rib_routes_json) is False:
- errormsg = "[DUT: {}]: No route found in fib".format(router)
- return errormsg
-
- network = static_route["network"]
- if "no_of_ip" in static_route:
- no_of_ip = static_route["no_of_ip"]
- else:
- no_of_ip = 1
-
- # Generating IPs for verification
- ip_list = generate_ips(network, no_of_ip)
- st_found = False
- nh_found = False
-
- for st_rt in ip_list:
- st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
- # st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
-
- _addr_type = validate_ip_address(st_rt)
- if _addr_type != addr_type:
- continue
-
- if st_rt in rib_routes_json:
- st_found = True
- found_routes.append(st_rt)
-
- if next_hop:
- if type(next_hop) is not list:
- next_hop = [next_hop]
-
- count = 0
- for nh in next_hop:
- for nh_dict in rib_routes_json[st_rt][0][
- "nexthops"
- ]:
- if nh_dict["ip"] != nh:
- continue
- else:
- count += 1
-
- if count == len(next_hop):
- nh_found = True
- else:
- missing_routes.append(st_rt)
- errormsg = (
- "Nexthop {} is Missing"
- " for route {} in "
- "RIB of router {}\n".format(
- next_hop, st_rt, dut
- )
- )
- return errormsg
-
- else:
- missing_routes.append(st_rt)
-
- if len(missing_routes) > 0:
- errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
- dut, missing_routes
- )
- return errormsg
-
- if nh_found:
- logger.info(
- "Found next_hop {} for all routes in RIB"
- " of router {}\n".format(next_hop, dut)
- )
-
- if found_routes:
- logger.info(
- "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
- dut,
- found_routes,
- )
-
- continue
-
- if "bgp" in input_dict[routerInput]:
- if (
- "advertise_networks"
- not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
- "unicast"
- ]
- ):
- continue
-
- found_routes = []
- missing_routes = []
- advertise_network = input_dict[routerInput]["bgp"]["address_family"][
- addr_type
- ]["unicast"]["advertise_networks"]
-
- # Continue if there are no network advertise
- if len(advertise_network) == 0:
- continue
-
- for advertise_network_dict in advertise_network:
- if "vrf" in advertise_network_dict:
- cmd = "{} vrf {} json".format(command, static_route["vrf"])
- else:
- cmd = "{} json".format(command)
-
- rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
-
- # Verifying output dictionary rib_routes_json is not empty
- if bool(rib_routes_json) is False:
- errormsg = "No route found in rib of router {}..".format(router)
- return errormsg
-
- start_ip = advertise_network_dict["network"]
- if "no_of_network" in advertise_network_dict:
- no_of_network = advertise_network_dict["no_of_network"]
- else:
- no_of_network = 1
-
- # Generating IPs for verification
- ip_list = generate_ips(start_ip, no_of_network)
- st_found = False
- nh_found = False
-
- for st_rt in ip_list:
- # st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
- st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
-
- _addr_type = validate_ip_address(st_rt)
- if _addr_type != addr_type:
- continue
-
- if st_rt in rib_routes_json:
- st_found = True
- found_routes.append(st_rt)
-
- if next_hop:
- if type(next_hop) is not list:
- next_hop = [next_hop]
-
- count = 0
- for nh in next_hop:
- for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
- if nh_dict["ip"] != nh:
- continue
- else:
- count += 1
-
- if count == len(next_hop):
- nh_found = True
- else:
- missing_routes.append(st_rt)
- errormsg = (
- "Nexthop {} is Missing"
- " for route {} in "
- "RIB of router {}\n".format(next_hop, st_rt, dut)
- )
- return errormsg
- else:
- missing_routes.append(st_rt)
-
- if len(missing_routes) > 0:
- errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
- dut, missing_routes
- )
- return errormsg
-
- if nh_found:
- logger.info(
- "Found next_hop {} for all routes in RIB"
- " of router {}\n".format(next_hop, dut)
- )
-
- if found_routes:
- logger.info(
- "[DUT: {}]: Verified routes FIB"
- ", found routes are: {}\n".format(dut, found_routes)
- )
-
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return True
-
-
-@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+@retry(attempts=6, wait=2, return_is_str=True)
def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
"""
Data will be read from input_dict or input JSON file, API will generate
@@ -3633,7 +3398,7 @@ def verify_route_maps(tgen, input_dict):
return True
-@retry(attempts=3, wait=4, return_is_str=True)
+@retry(attempts=4, wait=4, return_is_str=True)
def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
"""
API to veiryf BGP large community is attached in route for any given