summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
authorAshish Pant <ashish12pant@gmail.com>2019-06-25 06:33:09 +0530
committerAshish Pant <ashish12pant@gmail.com>2019-07-09 10:26:53 +0530
commitcb6b1d9058aacc6fb815fa4e837ceb8225702e8c (patch)
tree10fff885cd633dfe852636b064797ed68826ae80 /tests/topotests/lib/common_config.py
parent77ef1af6e373668c0a22a6d4fbd716dc9b10498e (diff)
tests: Adding 5 test cases to bgp-basic suite
Signed-off-by: Ashish Pant <ashish12pant@gmail.com> Adding test cases and verfication API for new test cases
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 78d8d022ad..99825dd2ad 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -20,6 +20,7 @@
from collections import OrderedDict
from datetime import datetime
+from time import sleep
import StringIO
import os
import ConfigParser
@@ -1095,3 +1096,253 @@ def create_route_maps(tgen, input_dict, build=False):
logger.debug("Exiting lib API: create_prefix_lists()")
return result
+
+#############################################
+# Verification APIs
+#############################################
+def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and do will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+ * `protocol`[optional]: protocol, default = None
+
+ Usage
+ -----
+ # RIB can be verified for static routes OR network advertised using
+ network command. Following are input_dicts to create static routes
+ and advertise networks using network command. Any one of the input_dict
+ can be passed to verify_rib() to verify routes in DUT"s RIB.
+
+ # Creating static routes for r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
+ "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
+ }}
+ # Advertising networks using network command in router r1
+ input_dict = {
+ "r1": {
+ "advertise_networks": [{"start_ip": "20.0.0.0/32",
+ "no_of_network": 10},
+ {"start_ip": "30.0.0.0/32"}]
+ }}
+ # Verifying ipv4 routes in router r1 learned via BGP
+ dut = "r2"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_rib()")
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ if protocol:
+ command = "show ip route {} json".format(protocol)
+ else:
+ command = "show ip route json"
+ else:
+ if protocol:
+ command = "show ipv6 route {} json".format(protocol)
+ else:
+ command = "show ipv6 route json"
+
+ sleep(2)
+ logger.info("Checking router %s RIB:", router)
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No {} route found in rib of router {}..". \
+ format(protocol, router)
+ return errormsg
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+ st_found = False
+ nh_found = False
+ found_routes = []
+ missing_routes = []
+ for static_route in static_routes:
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ found_hops = [rib_r["ip"] for rib_r in
+ rib_routes_json[st_rt][0][
+ "nexthops"]]
+ for nh in next_hop:
+ nh_found = False
+ if nh and nh in found_hops:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for {}"
+ " route {} in RIB of router"
+ " {}\n".format(next_hop,
+ protocol,
+ st_rt, dut))
+
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+ if nh_found:
+ logger.info("Found next_hop %s for all routes in RIB of"
+ " router %s\n", next_hop, dut)
+
+ if not st_found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, routes: " \
+ "{}\n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s\n", dut, found_routes)
+
+ advertise_network = input_dict[routerInput].setdefault(
+ "advertise_networks", {})
+ if advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+ for advertise_network_dict in advertise_network:
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ missing_routes.append(st_rt)
+
+ if not found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, are: {}" \
+ " \n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s", dut, found_routes)
+
+ logger.info("Exiting lib API: verify_rib()")
+ return True
+
+
+def verify_admin_distance_for_static_routes(tgen, input_dict):
+ """
+ API to verify admin distance for static routes as defined in input_dict/
+ input JSON by running show ip/ipv6 route json command.
+
+ Parameter
+ ---------
+ * `tgen` : topogen object
+ * `input_dict`: having details like - for which router and static routes
+ admin dsitance needs to be verified
+ Usage
+ -----
+ # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
+ 10.0.0.2 in router r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = verify_admin_distance_for_static_routes(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ for static_route in input_dict[router]["static_routes"]:
+ addr_type = validate_ip_address(static_route["network"])
+ # Command to execute
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+ show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
+
+ logger.info("Verifying admin distance for static route %s"
+ " under dut %s:", static_route, router)
+ network = static_route["network"]
+ next_hop = static_route["next_hop"]
+ admin_distance = static_route["admin_distance"]
+ route_data = show_ip_route_json[network][0]
+ if network in show_ip_route_json:
+ if route_data["nexthops"][0]["ip"] == next_hop:
+ if route_data["distance"] != admin_distance:
+ errormsg = ("Verification failed: admin distance"
+ " for static route {} under dut {},"
+ " found:{} but expected:{}".
+ format(static_route, router,
+ route_data["distance"],
+ admin_distance))
+ return errormsg
+ else:
+ logger.info("Verification successful: admin"
+ " distance for static route %s under"
+ " dut %s, found:%s", static_route,
+ router, route_data["distance"])
+
+ else:
+ errormsg = ("Static route {} not found in "
+ "show_ip_route_json for dut {}".
+ format(network, router))
+ return errormsg
+
+ logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
+ return True