diff options
| author | Ashish Pant <ashish12pant@gmail.com> | 2019-06-25 06:33:09 +0530 |
|---|---|---|
| committer | Ashish Pant <ashish12pant@gmail.com> | 2019-07-09 10:26:53 +0530 |
| commit | cb6b1d9058aacc6fb815fa4e837ceb8225702e8c (patch) | |
| tree | 10fff885cd633dfe852636b064797ed68826ae80 /tests/topotests/lib/common_config.py | |
| parent | 77ef1af6e373668c0a22a6d4fbd716dc9b10498e (diff) | |
tests: Adding 5 test cases to bgp-basic suite
Signed-off-by: Ashish Pant <ashish12pant@gmail.com>
Adding test cases and verfication API for new test cases
Diffstat (limited to 'tests/topotests/lib/common_config.py')
| -rw-r--r-- | tests/topotests/lib/common_config.py | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 78d8d022ad..99825dd2ad 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -20,6 +20,7 @@ from collections import OrderedDict from datetime import datetime +from time import sleep import StringIO import os import ConfigParser @@ -1095,3 +1096,253 @@ def create_route_maps(tgen, input_dict, build=False): logger.debug("Exiting lib API: create_prefix_lists()") return result + +############################################# +# Verification APIs +############################################# +def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): + """ + Data will be read from input_dict or input JSON file, API will generate + same prefixes, which were redistributed by either create_static_routes() or + advertise_networks_using_network_command() and do will verify next_hop and + each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" + command o/p. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test, for which user wants to test the data + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default: static + * `protocol`[optional]: protocol, default = None + + Usage + ----- + # RIB can be verified for static routes OR network advertised using + network command. Following are input_dicts to create static routes + and advertise networks using network command. Any one of the input_dict + can be passed to verify_rib() to verify routes in DUT"s RIB. + + # Creating static routes for r1 + input_dict = { + "r1": { + "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \ + "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}] + }} + # Advertising networks using network command in router r1 + input_dict = { + "r1": { + "advertise_networks": [{"start_ip": "20.0.0.0/32", + "no_of_network": 10}, + {"start_ip": "30.0.0.0/32"}] + }} + # Verifying ipv4 routes in router r1 learned via BGP + dut = "r2" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_rib()") + + router_list = tgen.routers() + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + if addr_type == "ipv4": + if protocol: + command = "show ip route {} json".format(protocol) + else: + command = "show ip route json" + else: + if protocol: + command = "show ipv6 route {} json".format(protocol) + else: + command = "show ipv6 route json" + + sleep(2) + logger.info("Checking router %s RIB:", router) + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No {} route found in rib of router {}..". \ + format(protocol, router) + return errormsg + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + st_found = False + nh_found = False + found_routes = [] + missing_routes = [] + for static_route in static_routes: + network = static_route["network"] + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 0 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + found_hops = [rib_r["ip"] for rib_r in + rib_routes_json[st_rt][0][ + "nexthops"]] + for nh in next_hop: + nh_found = False + if nh and nh in found_hops: + nh_found = True + else: + errormsg = ("Nexthop {} is Missing for {}" + " route {} in RIB of router" + " {}\n".format(next_hop, + protocol, + st_rt, dut)) + + return errormsg + else: + missing_routes.append(st_rt) + if nh_found: + logger.info("Found next_hop %s for all routes in RIB of" + " router %s\n", next_hop, dut) + + if not st_found and len(missing_routes) > 0: + errormsg = "Missing route in RIB of router {}, routes: " \ + "{}\n".format(dut, missing_routes) + return errormsg + + logger.info("Verified routes in router %s RIB, found routes" + " are: %s\n", dut, found_routes) + + advertise_network = input_dict[routerInput].setdefault( + "advertise_networks", {}) + if advertise_network: + found_routes = [] + missing_routes = [] + found = False + for advertise_network_dict in advertise_network: + start_ip = advertise_network_dict["network"] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 0 + + # Generating IPs for verification + ip_list = generate_ips(start_ip, no_of_network) + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + if st_rt in rib_routes_json: + found = True + found_routes.append(st_rt) + else: + missing_routes.append(st_rt) + + if not found and len(missing_routes) > 0: + errormsg = "Missing route in RIB of router {}, are: {}" \ + " \n".format(dut, missing_routes) + return errormsg + + logger.info("Verified routes in router %s RIB, found routes" + " are: %s", dut, found_routes) + + logger.info("Exiting lib API: verify_rib()") + return True + + +def verify_admin_distance_for_static_routes(tgen, input_dict): + """ + API to verify admin distance for static routes as defined in input_dict/ + input JSON by running show ip/ipv6 route json command. + + Parameter + --------- + * `tgen` : topogen object + * `input_dict`: having details like - for which router and static routes + admin dsitance needs to be verified + Usage + ----- + # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop + 10.0.0.2 in router r1 + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2" + }] + } + } + result = verify_admin_distance_for_static_routes(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_admin_distance_for_static_routes()") + + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + + for static_route in input_dict[router]["static_routes"]: + addr_type = validate_ip_address(static_route["network"]) + # Command to execute + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + show_ip_route_json = rnode.vtysh_cmd(command, isjson=True) + + logger.info("Verifying admin distance for static route %s" + " under dut %s:", static_route, router) + network = static_route["network"] + next_hop = static_route["next_hop"] + admin_distance = static_route["admin_distance"] + route_data = show_ip_route_json[network][0] + if network in show_ip_route_json: + if route_data["nexthops"][0]["ip"] == next_hop: + if route_data["distance"] != admin_distance: + errormsg = ("Verification failed: admin distance" + " for static route {} under dut {}," + " found:{} but expected:{}". + format(static_route, router, + route_data["distance"], + admin_distance)) + return errormsg + else: + logger.info("Verification successful: admin" + " distance for static route %s under" + " dut %s, found:%s", static_route, + router, route_data["distance"]) + + else: + errormsg = ("Static route {} not found in " + "show_ip_route_json for dut {}". + format(network, router)) + return errormsg + + logger.info("Exiting lib API: verify_admin_distance_for_static_routes()") + return True |
