summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py583
1 files changed, 556 insertions, 27 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index e4d72ea2d7..1846d43138 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -30,7 +30,6 @@ from functools import wraps
from re import search as re_search
from tempfile import mkdtemp
-import StringIO
import os
import sys
import ConfigParser
@@ -38,6 +37,13 @@ import traceback
import socket
import ipaddress
+if sys.version_info[0] > 2:
+ import io
+ import configparser
+else:
+ import StringIO
+ import ConfigParser as configparser
+
from lib.topolog import logger, logger_config
from lib.topogen import TopoRouter, get_topogen
from lib.topotest import interface_set_status
@@ -61,7 +67,7 @@ TMPDIR = None
# NOTE: to save execution logs to log file frrtest_log_dir must be configured
# in `pytest.ini`.
-config = ConfigParser.ConfigParser()
+config = configparser.ConfigParser()
config.read(PYTESTINI_PATH)
config_section = "topogen"
@@ -393,6 +399,14 @@ def check_router_status(tgen):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+def getStrIO():
+ """
+ Return a StringIO object appropriate for the current python version.
+ """
+ if sys.version_info[0] > 2:
+ return io.StringIO()
+ else:
+ return StringIO.StringIO()
def reset_config_on_routers(tgen, routerName=None):
"""
@@ -465,7 +479,7 @@ def reset_config_on_routers(tgen, routerName=None):
raise InvalidCLIError("Unknown error in %s", output)
f = open(dname, "r")
- delta = StringIO.StringIO()
+ delta = getStrIO()
delta.write("configure terminal\n")
t_delta = f.read()
@@ -499,7 +513,7 @@ def reset_config_on_routers(tgen, routerName=None):
output = router.vtysh_multicmd(delta.getvalue(), pretty_output=False)
delta.close()
- delta = StringIO.StringIO()
+ delta = getStrIO()
cfg = router.run("vtysh -c 'show running'")
for line in cfg.split("\n"):
line = line.strip()
@@ -713,8 +727,8 @@ def start_topology(tgen):
os.chdir("{}/{}".format(TMPDIR, rname))
os.system("touch zebra.conf bgpd.conf")
- except IOError as (errno, strerror):
- logger.error("I/O error({0}): {1}".format(errno, strerror))
+ except IOError as err:
+ logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
# Loading empty zebra.conf file to router, to start the zebra deamon
router.load_config(
@@ -1224,7 +1238,8 @@ def interface_status(tgen, topo, input_dict):
return True
-def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False):
+def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0,
+ return_is_dict=False):
"""
Retries function execution, if return is an errormsg or exception
@@ -1236,11 +1251,13 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict
"""
def _retry(func):
+
@wraps(func)
def func_retry(*args, **kwargs):
- _wait = kwargs.pop("wait", wait)
- _attempts = kwargs.pop("attempts", attempts)
+ _wait = kwargs.pop('wait', wait)
+ _attempts = kwargs.pop('attempts', attempts)
_attempts = int(_attempts)
+ expected = True
if _attempts < 0:
raise ValueError("attempts must be 0 or greater")
@@ -1248,40 +1265,40 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict
logger.info("Waiting for [%s]s as initial delay", initial_wait)
sleep(initial_wait)
- _return_is_str = kwargs.pop("return_is_str", return_is_str)
- _return_is_dict = kwargs.pop("return_is_str", return_is_dict)
+ _return_is_str = kwargs.pop('return_is_str', return_is_str)
+ _return_is_dict = kwargs.pop('return_is_str', return_is_dict)
for i in range(1, _attempts + 1):
try:
- _expected = kwargs.setdefault("expected", True)
- kwargs.pop("expected")
+ _expected = kwargs.setdefault('expected', True)
+ if _expected is False:
+ expected = _expected
+ kwargs.pop('expected')
ret = func(*args, **kwargs)
- logger.debug("Function returned %s" % ret)
+ logger.debug("Function returned %s", ret)
if _return_is_str and isinstance(ret, bool) and _expected:
return ret
- if (
- isinstance(ret, str) or isinstance(ret, unicode)
- ) and _expected is False:
+ if (isinstance(ret, str) or isinstance(ret, unicode)) and _expected is False:
return ret
if _return_is_dict and isinstance(ret, dict):
return ret
- if _attempts == i:
+ if _attempts == i and expected:
generate_support_bundle()
return ret
except Exception as err:
- if _attempts == i:
+ if _attempts == i and expected:
generate_support_bundle()
- logger.info("Max number of attempts (%r) reached", _attempts)
+ logger.info("Max number of attempts (%r) reached",
+ _attempts)
raise
else:
logger.info("Function returned %s", err)
if i < _attempts:
- logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait))
+ logger.info("Retry [#%r] after sleeping for %ss"
+ % (i, _wait))
sleep(_wait)
-
func_retry._original = func
return func_retry
-
return _retry
@@ -2191,7 +2208,7 @@ def addKernelRoute(
if mask == "32" or mask == "128":
grp_addr = ip
- if not re_search(r"{}".format(grp_addr), result) and mask is not "0":
+ if not re_search(r"{}".format(grp_addr), result) and mask != "0":
errormsg = (
"[DUT: {}]: Kernal route is not added for group"
" address {} Config output: {}".format(router, grp_addr, output)
@@ -2352,7 +2369,9 @@ def configure_brctl(tgen, topo, input_dict):
):
ip_cmd_list = []
- cmd = "ip link add name {} type bridge stp_state {}".format(brctl_name, stp)
+ cmd = "ip link add name {} type bridge stp_state {}".format(
+ brctl_name, stp
+ )
logger.info("[DUT: %s]: Running command: %s", dut, cmd)
rnode.run(cmd)
@@ -2502,7 +2521,7 @@ def verify_rib(
errormsg(str) or True
"""
- logger.info("Entering lib API: verify_rib()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
@@ -2817,7 +2836,517 @@ def verify_rib(
" routes are: {}\n".format(addr_type, dut, found_routes)
)
- logger.info("Exiting lib API: verify_rib()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 fib json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+
+ Usage
+ -----
+ input_routes_r1 = {
+ "r1": {
+ "static_routes": [{
+ "network": ["1.1.1.1/32],
+ "next_hop": "Null0",
+ "vrf": "RED"
+ }]
+ }
+ }
+ result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ logger.info("Checking router %s FIB routes:", router)
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ command = "show ip fib"
+ else:
+ command = "show ipv6 fib"
+
+ found_routes = []
+ missing_routes = []
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ if "vrf" in static_route and static_route["vrf"] is not None:
+
+ logger.info(
+ "[DUT: {}]: Verifying routes for VRF:"
+ " {}".format(router, static_route["vrf"])
+ )
+
+ cmd = "{} vrf {}".format(command, static_route["vrf"])
+
+ else:
+ cmd = "{}".format(command)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "[DUT: {}]: No route found in fib".format(router)
+ return errormsg
+
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddress.ip_network(unicode(st_rt)))
+ #st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0][
+ "nexthops"
+ ]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+
+ if count == len(next_hop):
+ nh_found = True
+ else:
+ missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(
+ next_hop, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
+
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
+
+ continue
+
+ if "bgp" in input_dict[routerInput]:
+ if (
+ "advertise_networks"
+ not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]
+ ):
+ continue
+
+ found_routes = []
+ missing_routes = []
+ advertise_network = input_dict[routerInput]["bgp"]["address_family"][
+ addr_type
+ ]["unicast"]["advertise_networks"]
+
+ # Continue if there are no network advertise
+ if len(advertise_network) == 0:
+ continue
+
+ for advertise_network_dict in advertise_network:
+ if "vrf" in advertise_network_dict:
+ cmd = "{} vrf {} json".format(command, static_route["vrf"])
+ else:
+ cmd = "{} json".format(command)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ #st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+ st_rt = str(ipaddress.ip_network(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+
+ if count == len(next_hop):
+ nh_found = True
+ else:
+ missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(next_hop, st_rt, dut)
+ )
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
+
+ if found_routes:
+ logger.info(
+ "[DUT: {}]: Verified routes FIB"
+ ", found routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 fib json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+
+ Usage
+ -----
+ input_routes_r1 = {
+ "r1": {
+ "static_routes": [{
+ "network": ["1.1.1.1/32],
+ "next_hop": "Null0",
+ "vrf": "RED"
+ }]
+ }
+ }
+ result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ logger.info("Checking router %s FIB routes:", router)
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ command = "show ip fib"
+ else:
+ command = "show ipv6 fib"
+
+ found_routes = []
+ missing_routes = []
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ if "vrf" in static_route and static_route["vrf"] is not None:
+
+ logger.info(
+ "[DUT: {}]: Verifying routes for VRF:"
+ " {}".format(router, static_route["vrf"])
+ )
+
+ cmd = "{} vrf {}".format(command, static_route["vrf"])
+
+ else:
+ cmd = "{}".format(command)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "[DUT: {}]: No route found in fib".format(router)
+ return errormsg
+
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddress.ip_network(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0][
+ "nexthops"
+ ]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+
+ if count == len(next_hop):
+ nh_found = True
+ else:
+ missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(
+ next_hop, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
+
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
+
+ continue
+
+ if "bgp" in input_dict[routerInput]:
+ if (
+ "advertise_networks"
+ not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]
+ ):
+ continue
+
+ found_routes = []
+ missing_routes = []
+ advertise_network = input_dict[routerInput]["bgp"]["address_family"][
+ addr_type
+ ]["unicast"]["advertise_networks"]
+
+ # Continue if there are no network advertise
+ if len(advertise_network) == 0:
+ continue
+
+ for advertise_network_dict in advertise_network:
+ if "vrf" in advertise_network_dict:
+ cmd = "{} vrf {} json".format(command, static_route["vrf"])
+ else:
+ cmd = "{} json".format(command)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddress.ip_network(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+
+ if count == len(next_hop):
+ nh_found = True
+ else:
+ missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(next_hop, st_rt, dut)
+ )
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
+
+ if found_routes:
+ logger.info(
+ "[DUT: {}]: Verified routes FIB"
+ ", found routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True