summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py844
1 files changed, 702 insertions, 142 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 0b19877aff..3de7ab3ebe 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -175,6 +175,49 @@ def run_frr_cmd(rnode, cmd, isjson=False):
raise InvalidCLIError("No actual cmd passed")
+def apply_raw_config(tgen, input_dict):
+
+ """
+ API to configure raw configuration on device. This can be used for any cli
+ which does has not been implemented in JSON.
+
+ Parameters
+ ----------
+ * `tgen`: tgen onject
+ * `input_dict`: configuration that needs to be applied
+
+ Usage
+ -----
+ input_dict = {
+ "r2": {
+ "raw_config": [
+ "router bgp",
+ "no bgp update-group-split-horizon"
+ ]
+ }
+ }
+ Returns
+ -------
+ True or errormsg
+ """
+
+ result = True
+ for router_name in input_dict.keys():
+ config_cmd = input_dict[router_name]["raw_config"]
+
+ if not isinstance(config_cmd, list):
+ config_cmd = [config_cmd]
+
+ frr_cfg_file = "{}/{}/{}".format(TMPDIR, router_name, FRRCFG_FILE)
+ with open(frr_cfg_file, "w") as cfg:
+ for cmd in config_cmd:
+ cfg.write("{}\n".format(cmd))
+
+ result = load_config_to_router(tgen, router_name)
+
+ return result
+
+
def create_common_configuration(
tgen, router, data, config_type=None, build=False, load_config=True
):
@@ -207,6 +250,7 @@ def create_common_configuration(
"bgp_community_list": "! Community List Config\n",
"route_maps": "! Route Maps Config\n",
"bgp": "! BGP Config\n",
+ "vrf": "! VRF Config\n",
}
)
@@ -355,6 +399,7 @@ def reset_config_on_routers(tgen, routerName=None):
"""
Resets configuration on routers to the snapshot created using input JSON
file. It replaces existing router configuration with FRRCFG_BKUP_FILE
+
Parameters
----------
* `tgen` : Topogen object
@@ -370,6 +415,7 @@ def reset_config_on_routers(tgen, routerName=None):
router = router_list[rname]
logger.info("Configuring router %s to initial test configuration", rname)
+
cfg = router.run("vtysh -c 'show running'")
fname = "{}/{}/frr.sav".format(TMPDIR, rname)
dname = "{}/{}/delta.conf".format(TMPDIR, rname)
@@ -387,22 +433,13 @@ def reset_config_on_routers(tgen, routerName=None):
f.write("\n")
f.close()
-
run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname)
init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
-
- tempdir = mkdtemp()
- with open(os.path.join(tempdir, "vtysh.conf"), "w") as fd:
- pass
-
- command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}".format(
- tempdir, run_cfg_file, init_cfg_file, dname
+ command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format(
+ run_cfg_file, init_cfg_file, dname
)
result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE)
- os.unlink(os.path.join(tempdir, "vtysh.conf"))
- os.rmdir(tempdir)
-
# Assert if command fail
if result > 0:
logger.error("Delta file creation failed. Command executed %s", command)
@@ -459,17 +496,18 @@ def reset_config_on_routers(tgen, routerName=None):
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
- logger.info("Configuration on router {} after config reset:".format(rname))
+ logger.info("Configuration on router {} after reset:".format(rname))
logger.info(delta.getvalue())
delta.close()
- logger.debug("Exting API: reset_config_on_routers")
+ logger.debug("Exiting API: reset_config_on_routers")
return True
def load_config_to_router(tgen, routerName, save_bkup=False):
"""
Loads configuration on router from the file FRRCFG_FILE.
+
Parameters
----------
* `tgen` : Topogen object
@@ -481,7 +519,7 @@ def load_config_to_router(tgen, routerName, save_bkup=False):
router_list = tgen.routers()
for rname in ROUTER_LIST:
- if routerName and routerName != rname:
+ if routerName and rname != routerName:
continue
router = router_list[rname]
@@ -504,6 +542,7 @@ def load_config_to_router(tgen, routerName, save_bkup=False):
raise InvalidCLIError("%s" % output)
cfg.truncate(0)
+
except IOError as err:
errormsg = (
"Unable to open config File. error(%s):" " %s",
@@ -514,24 +553,29 @@ def load_config_to_router(tgen, routerName, save_bkup=False):
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
+ logger.info("New configuration for router {}:".format(rname))
new_config = router.run("vtysh -c 'show running'")
logger.info(new_config)
- logger.debug("Exting API: load_config_to_router")
+ logger.debug("Exiting API: load_config_to_router")
return True
-def get_frr_ipv6_linklocal(tgen, router, intf=None):
+def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
"""
API to get the link local ipv6 address of a perticular interface using
FRR command 'show interface'
+
* `tgen`: tgen onject
* `router` : router for which hightest interface should be
calculated
* `intf` : interface for which linklocal address needs to be taken
+ * `vrf` : VRF name
+
Usage
-----
linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
+
Returns
-------
1) array of interface names to link local ips.
@@ -544,7 +588,10 @@ def get_frr_ipv6_linklocal(tgen, router, intf=None):
linklocal = []
- cmd = "show interface"
+ if vrf:
+ cmd = "show interface vrf {}".format(vrf)
+ else:
+ cmd = "show interface"
ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
@@ -635,6 +682,55 @@ def start_topology(tgen):
tgen.start_router()
+def stop_router(tgen, router):
+ """
+ Router"s current config would be saved to /etc/frr/ for each deamon
+ and router and its deamons would be stopped.
+
+ * `tgen` : topogen object
+ * `router`: Device under test
+ """
+
+ router_list = tgen.routers()
+
+ # Saving router config to /etc/frr, which will be loaded to router
+ # when it starts
+ router_list[router].vtysh_cmd("write memory")
+
+ # Stop router
+ router_list[router].stop()
+
+
+def start_router(tgen, router):
+ """
+ Router will started and config would be loaded from /etc/frr/ for each
+ deamon
+
+ * `tgen` : topogen object
+ * `router`: Device under test
+ """
+
+ logger.debug("Entering lib API: start_router")
+
+ try:
+ router_list = tgen.routers()
+
+ # Router and its deamons would be started and config would
+ # be loaded to router for each deamon from /etc/frr
+ router_list[router].start()
+
+ # Waiting for router to come up
+ sleep(5)
+
+ except Exception as e:
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: start_router()")
+ return True
+
+
def number_to_row(routerName):
"""
Returns the number for the router.
@@ -658,6 +754,190 @@ def number_to_column(routerName):
#############################################
+def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
+ """
+ Create vrf configuration for created topology. VRF
+ configuration is provided in input json file.
+
+ VRF config is done in Linux Kernel:
+ * Create VRF
+ * Attach interface to VRF
+ * Bring up VRF
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring
+ from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict={
+ "r3": {
+ "links": {
+ "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
+ "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
+ "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
+ "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
+ },
+ "vrfs":[
+ {
+ "name": "RED_A",
+ "id": "1"
+ },
+ {
+ "name": "RED_B",
+ "id": "2"
+ },
+ {
+ "name": "BLUE_A",
+ "id": "3",
+ "delete": True
+ },
+ {
+ "name": "BLUE_B",
+ "id": "4"
+ }
+ ]
+ }
+ }
+ result = create_vrf_cfg(tgen, topo, input_dict)
+
+ Returns
+ -------
+ True or False
+ """
+ result = True
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ input_dict = deepcopy(input_dict)
+
+ try:
+ for c_router, c_data in input_dict.iteritems():
+ rnode = tgen.routers()[c_router]
+ if "vrfs" in c_data:
+ for vrf in c_data["vrfs"]:
+ config_data = []
+ del_action = vrf.setdefault("delete", False)
+ name = vrf.setdefault("name", None)
+ table_id = vrf.setdefault("id", None)
+ vni = vrf.setdefault("vni", None)
+ del_vni = vrf.setdefault("no_vni", None)
+
+ if del_action:
+ # Kernel cmd- Add VRF and table
+ cmd = "ip link del {} type vrf table {}".format(
+ vrf["name"], vrf["id"]
+ )
+
+ logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
+ rnode.run(cmd)
+
+ # Kernel cmd - Bring down VRF
+ cmd = "ip link set dev {} down".format(name)
+ logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
+ rnode.run(cmd)
+
+ else:
+ if name and table_id:
+ # Kernel cmd- Add VRF and table
+ cmd = "ip link add {} type vrf table {}".format(
+ name, table_id
+ )
+ logger.info(
+ "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
+ )
+ rnode.run(cmd)
+
+ # Kernel cmd - Bring up VRF
+ cmd = "ip link set dev {} up".format(name)
+ logger.info(
+ "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
+ )
+ rnode.run(cmd)
+
+ if "links" in c_data:
+ for destRouterLink, data in sorted(
+ c_data["links"].iteritems()
+ ):
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+
+ if "vrf" in data:
+ vrf_list = data["vrf"]
+
+ if type(vrf_list) is not list:
+ vrf_list = [vrf_list]
+
+ for _vrf in vrf_list:
+ cmd = "ip link set {} master {}".format(
+ interface_name, _vrf
+ )
+
+ logger.info(
+ "[DUT: %s]: Running" " kernel cmd [%s]",
+ c_router,
+ cmd,
+ )
+ rnode.run(cmd)
+
+ result = create_common_configuration(
+ tgen, c_router, config_data, "vrf", build=build
+ )
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ return result
+
+
+def create_interface_in_kernel(
+ tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
+):
+ """
+ Cretae interfaces in kernel for ipv4/ipv6
+ Config is done in Linux Kernel:
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut` : Device for which interfaces to be added
+ * `name` : interface name
+ * `ip_addr` : ip address for interface
+ * `vrf` : VRF name, to which interface will be associated
+ * `netmask` : netmask value, default is None
+ * `create`: Create interface in kernel, if created then no need
+ to create
+ """
+
+ rnode = tgen.routers()[dut]
+
+ if create:
+ cmd = "sudo ip link add name {} type dummy".format(name)
+ rnode.run(cmd)
+
+ addr_type = validate_ip_address(ip_addr)
+ if addr_type == "ipv4":
+ cmd = "ifconfig {} {} netmask {}".format(name, ip_addr, netmask)
+ else:
+ cmd = "ifconfig {} inet6 add {}/{}".format(name, ip_addr, netmask)
+
+ rnode.run(cmd)
+
+ if vrf:
+ cmd = "ip link set {} master {}".format(name, vrf)
+ rnode.run(cmd)
+
+
def validate_ip_address(ip_address):
"""
Validates the type of ip address
@@ -860,13 +1140,15 @@ def interface_status(tgen, topo, input_dict):
return True
-def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
+def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False):
"""
Retries function execution, if return is an errormsg or exception
+
* `attempts`: Number of attempts to make
* `wait`: Number of seconds to wait between each attempt
* `return_is_str`: Return val is an errormsg in case of failure
* `initial_wait`: Sleeps for this much seconds before executing function
+
"""
def _retry(func):
@@ -883,15 +1165,20 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
sleep(initial_wait)
_return_is_str = kwargs.pop("return_is_str", return_is_str)
+ _return_is_dict = kwargs.pop("return_is_str", return_is_dict)
for i in range(1, _attempts + 1):
try:
_expected = kwargs.setdefault("expected", True)
kwargs.pop("expected")
ret = func(*args, **kwargs)
logger.debug("Function returned %s" % ret)
- if return_is_str and isinstance(ret, bool) and _expected:
+ if _return_is_str and isinstance(ret, bool) and _expected:
return ret
- if isinstance(ret, str) and _expected is False:
+ if (
+ isinstance(ret, str) or isinstance(ret, unicode)
+ ) and _expected is False:
+ return ret
+ if _return_is_dict and isinstance(ret, dict):
return ret
if _attempts == i:
@@ -945,16 +1232,19 @@ def create_interfaces_cfg(tgen, topo, build=False):
"""
Create interface configuration for created topology. Basic Interface
configuration is provided in input json file.
+
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `build` : Only for initial setup phase this is set as True.
+
Returns
-------
True or False
"""
result = False
+ topo = deepcopy(topo)
try:
for c_router, c_data in topo.iteritems():
@@ -965,13 +1255,30 @@ def create_interfaces_cfg(tgen, topo, build=False):
interface_name = destRouterLink
else:
interface_name = data["interface"]
+
interface_data.append("interface {}".format(str(interface_name)))
if "ipv4" in data:
intf_addr = c_data["links"][destRouterLink]["ipv4"]
- interface_data.append("ip address {}".format(intf_addr))
+
+ if "delete" in data and data["delete"]:
+ interface_data.append("no ip address {}".format(intf_addr))
+ else:
+ interface_data.append("ip address {}".format(intf_addr))
if "ipv6" in data:
intf_addr = c_data["links"][destRouterLink]["ipv6"]
- interface_data.append("ipv6 address {}".format(intf_addr))
+
+ if "delete" in data and data["delete"]:
+ interface_data.append("no ipv6 address {}".format(intf_addr))
+ else:
+ interface_data.append("ipv6 address {}".format(intf_addr))
+
+ if "ipv6-link-local" in data:
+ intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
+
+ if "delete" in data and data["delete"]:
+ interface_data.append("no ipv6 address {}".format(intf_addr))
+ else:
+ interface_data.append("ipv6 address {}\n".format(intf_addr))
result = create_common_configuration(
tgen, c_router, interface_data, "interface_config", build=build
@@ -988,11 +1295,13 @@ def create_interfaces_cfg(tgen, topo, build=False):
def create_static_routes(tgen, input_dict, build=False):
"""
Create static routes for given router as defined in input_dict
+
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
+
Usage
-----
input_dict should be in the format below:
@@ -1002,7 +1311,9 @@ def create_static_routes(tgen, input_dict, build=False):
# admin_distance: admin distance for route/routes.
# next_hop: starting next-hop address
# tag: tag id for static routes
+ # vrf: VRF name in which static routes needs to be created
# delete: True if config to be removed. Default False.
+
Example:
"routers": {
"r1": {
@@ -1012,24 +1323,27 @@ def create_static_routes(tgen, input_dict, build=False):
"no_of_ip": 9,
"admin_distance": 100,
"next_hop": "10.0.0.1",
- "tag": 4001
+ "tag": 4001,
+ "vrf": "RED_A"
"delete": true
}
]
}
}
+
Returns
-------
errormsg(str) or True
"""
result = False
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Entering lib API: create_static_routes()")
input_dict = deepcopy(input_dict)
+
try:
for router in input_dict.keys():
if "static_routes" not in input_dict[router]:
errormsg = "static_routes not present in input_dict"
- logger.debug(errormsg)
+ logger.info(errormsg)
continue
static_routes_list = []
@@ -1037,27 +1351,38 @@ def create_static_routes(tgen, input_dict, build=False):
static_routes = input_dict[router]["static_routes"]
for static_route in static_routes:
del_action = static_route.setdefault("delete", False)
- # No of IPs
no_of_ip = static_route.setdefault("no_of_ip", 1)
- admin_distance = static_route.setdefault("admin_distance", None)
- tag = static_route.setdefault("tag", None)
- if "next_hop" not in static_route or "network" not in static_route:
- errormsg = "'next_hop' or 'network' missing in" " input_dict"
- return errormsg
-
- next_hop = static_route["next_hop"]
- network = static_route["network"]
+ network = static_route.setdefault("network", [])
if type(network) is not list:
network = [network]
+ admin_distance = static_route.setdefault("admin_distance", None)
+ tag = static_route.setdefault("tag", None)
+ vrf = static_route.setdefault("vrf", None)
+ interface = static_route.setdefault("interface", None)
+ next_hop = static_route.setdefault("next_hop", None)
+ nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
+
ip_list = generate_ips(network, no_of_ip)
for ip in ip_list:
addr_type = validate_ip_address(ip)
if addr_type == "ipv4":
- cmd = "ip route {} {}".format(ip, next_hop)
+ cmd = "ip route {}".format(ip)
else:
- cmd = "ipv6 route {} {}".format(ip, next_hop)
+ cmd = "ipv6 route {}".format(ip)
+
+ if interface:
+ cmd = "{} {}".format(cmd, interface)
+
+ if next_hop:
+ cmd = "{} {}".format(cmd, next_hop)
+
+ if nexthop_vrf:
+ cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
+
+ if vrf:
+ cmd = "{} vrf {}".format(cmd, vrf)
if tag:
cmd = "{} tag {}".format(cmd, str(tag))
@@ -1080,7 +1405,7 @@ def create_static_routes(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Exiting lib API: create_static_routes()")
return result
@@ -1433,6 +1758,9 @@ def create_route_maps(tgen, input_dict, build=False):
"large_community_list", {}
)
+ metric = match_data.setdefault("metric", None)
+ source_vrf = match_data.setdefault("source-vrf", None)
+
if ipv4_data:
# fetch prefix list data from rmap
prefix_name = ipv4_data.setdefault("prefix_lists", None)
@@ -1525,6 +1853,14 @@ def create_route_maps(tgen, input_dict, build=False):
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
+ if source_vrf:
+ cmd = "match source-vrf {}".format(source_vrf)
+ rmap_data.append(cmd)
+
+ if metric:
+ cmd = "match metric {}".format(metric)
+ rmap_data.append(cmd)
+
result = create_common_configuration(
tgen, router, rmap_data, "route_maps", build=build
)
@@ -1689,17 +2025,97 @@ def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
interface_set_status(router_list[dut], intf_name, ifaceaction)
+def addKernelRoute(
+ tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
+):
+ """
+ Add route to kernel
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `router`: router for which kernal routes needs to be added
+ * `intf`: interface name, for which kernal routes needs to be added
+ * `bindToAddress`: bind to <host>, an interface or multicast
+ address
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: addKernelRoute()")
+
+ rnode = tgen.routers()[router]
+
+ if type(group_addr_range) is not list:
+ group_addr_range = [group_addr_range]
+
+ for grp_addr in group_addr_range:
+
+ addr_type = validate_ip_address(grp_addr)
+ if addr_type == "ipv4":
+ if next_hop is not None:
+ cmd = "ip route add {} via {}".format(grp_addr, next_hop)
+ else:
+ cmd = "ip route add {} dev {}".format(grp_addr, intf)
+ if del_action:
+ cmd = "ip route del {}".format(grp_addr)
+ verify_cmd = "ip route"
+ elif addr_type == "ipv6":
+ if intf and src:
+ cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
+ else:
+ cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
+ verify_cmd = "ip -6 route"
+ if del_action:
+ cmd = "ip -6 route del {}".format(grp_addr)
+
+ logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
+ output = rnode.run(cmd)
+
+ # Verifying if ip route added to kernal
+ result = rnode.run(verify_cmd)
+ logger.debug("{}\n{}".format(verify_cmd, result))
+ if "/" in grp_addr:
+ ip, mask = grp_addr.split("/")
+ if mask == "32" or mask == "128":
+ grp_addr = ip
+
+ if not re_search(r"{}".format(grp_addr), result) and mask is not "0":
+ errormsg = (
+ "[DUT: {}]: Kernal route is not added for group"
+ " address {} Config output: {}".format(router, grp_addr, output)
+ )
+
+ return errormsg
+
+ logger.debug("Exiting lib API: addKernelRoute()")
+ return True
+
+
#############################################
# Verification APIs
#############################################
-@retry(attempts=10, return_is_str=True, initial_wait=2)
-def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_rib(
+ tgen,
+ addr_type,
+ dut,
+ input_dict,
+ next_hop=None,
+ protocol=None,
+ tag=None,
+ metric=None,
+ fib=None,
+):
"""
Data will be read from input_dict or input JSON file, API will generate
same prefixes, which were redistributed by either create_static_routes() or
advertise_networks_using_network_command() and do will verify next_hop and
each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
command o/p.
+
Parameters
----------
* `tgen` : topogen object
@@ -1709,12 +2125,14 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
* `next_hop`[optional]: next_hop which needs to be verified,
default: static
* `protocol`[optional]: protocol, default = None
+
Usage
-----
# RIB can be verified for static routes OR network advertised using
network command. Following are input_dicts to create static routes
and advertise networks using network command. Any one of the input_dict
can be passed to verify_rib() to verify routes in DUT"s RIB.
+
# Creating static routes for r1
input_dict = {
"r1": {
@@ -1732,186 +2150,328 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
dut = "r2"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.info("Entering lib API: verify_rib()")
router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.iteritems():
if router != dut:
continue
+ logger.info("Checking router %s RIB:", router)
+
# Verifying RIB routes
if addr_type == "ipv4":
- if protocol:
- command = "show ip route {} json".format(protocol)
- else:
- command = "show ip route json"
+ command = "show ip route"
else:
- if protocol:
- command = "show ipv6 route {} json".format(protocol)
- else:
- command = "show ipv6 route json"
-
- logger.info("Checking router %s RIB:", router)
- rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
+ command = "show ipv6 route"
- # Verifying output dictionary rib_routes_json is not empty
- if bool(rib_routes_json) is False:
- errormsg = "No {} route found in rib of router {}..".format(
- protocol, router
- )
- return errormsg
+ found_routes = []
+ missing_routes = []
if "static_routes" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["static_routes"]
- st_found = False
- nh_found = False
- found_routes = []
- missing_routes = []
+
for static_route in static_routes:
+ if "vrf" in static_route and static_route["vrf"] is not None:
+
+ logger.info(
+ "[DUT: {}]: Verifying routes for VRF:"
+ " {}".format(router, static_route["vrf"])
+ )
+
+ cmd = "{} vrf {}".format(command, static_route["vrf"])
+
+ else:
+ cmd = "{}".format(command)
+
+ if protocol:
+ cmd = "{} {}".format(cmd, protocol)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
network = static_route["network"]
if "no_of_ip" in static_route:
no_of_ip = static_route["no_of_ip"]
else:
no_of_ip = 1
+ if "tag" in static_route:
+ _tag = static_route["tag"]
+ else:
+ _tag = None
+
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
if st_rt in rib_routes_json:
st_found = True
found_routes.append(st_rt)
- if next_hop:
+ if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
+ for mnh in range(0, len(rib_routes_json[st_rt])):
+ if (
+ "fib"
+ in rib_routes_json[st_rt][mnh]["nexthops"][0]
+ ):
+ found_hops.append(
+ [
+ rib_r["ip"]
+ for rib_r in rib_routes_json[st_rt][
+ mnh
+ ]["nexthops"]
+ ]
+ )
+
+ if found_hops[0]:
+ missing_list_of_nexthops = set(
+ found_hops[0]
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops[0])
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Nexthop "
+ "%s is not active for route %s in "
+ "RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is not active"
+ " for route {} in RIB of router"
+ " {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+
+ elif next_hop and fib is None:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
found_hops = [
rib_r["ip"]
for rib_r in rib_routes_json[st_rt][0]["nexthops"]
]
- for nh in found_hops:
- nh_found = False
- if nh and nh in next_hop:
- nh_found = True
- else:
+
+ if found_hops:
+ missing_list_of_nexthops = set(
+ found_hops
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops)
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
errormsg = (
- "Nexthop {} is Missing for {}"
- " route {} in RIB of router"
- " {}\n".format(
- next_hop, protocol, st_rt, dut
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
)
)
-
return errormsg
+ else:
+ nh_found = True
+
+ if tag:
+ if "tag" not in rib_routes_json[st_rt][0]:
+ errormsg = (
+ "[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if _tag != rib_routes_json[st_rt][0]["tag"]:
+ errormsg = (
+ "[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".format(dut, _tag, st_rt,)
+ )
+ return errormsg
+
+ if metric is not None:
+ if "metric" not in rib_routes_json[st_rt][0]:
+ errormsg = (
+ "[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if metric != rib_routes_json[st_rt][0]["metric"]:
+ errormsg = (
+ "[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".format(dut, metric, st_rt,)
+ )
+ return errormsg
+
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
- "Found next_hop %s for all routes in RIB of" " router %s\n",
- next_hop,
- dut,
+ "[DUT: {}]: Found next_hop {} for all bgp"
+ " routes in RIB".format(router, next_hop)
)
- if not st_found and len(missing_routes) > 0:
- errormsg = (
- "Missing route in RIB of router {}, routes: "
- "{}\n".format(dut, missing_routes)
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
+ dut, missing_routes
)
return errormsg
- logger.info(
- "Verified routes in router %s RIB, found routes" " are: %s\n",
- dut,
- found_routes,
- )
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
continue
if "bgp" in input_dict[routerInput]:
if (
"advertise_networks"
- in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+ not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
"unicast"
]
):
+ continue
- found_routes = []
- missing_routes = []
- advertise_network = input_dict[routerInput]["bgp"][
- "address_family"
- ][addr_type]["unicast"]["advertise_networks"]
+ found_routes = []
+ missing_routes = []
+ advertise_network = input_dict[routerInput]["bgp"]["address_family"][
+ addr_type
+ ]["unicast"]["advertise_networks"]
- for advertise_network_dict in advertise_network:
- start_ip = advertise_network_dict["network"]
- if "no_of_network" in advertise_network_dict:
- no_of_network = advertise_network_dict["no_of_network"]
- else:
- no_of_network = 1
-
- # Generating IPs for verification
- ip_list = generate_ips(start_ip, no_of_network)
- for st_rt in ip_list:
- st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
-
- found = False
- nh_found = False
- if st_rt in rib_routes_json:
- found = True
- found_routes.append(st_rt)
-
- if next_hop:
- if type(next_hop) is not list:
- next_hop = [next_hop]
-
- for nh in next_hop:
- for nh_json in rib_routes_json[st_rt][0][
- "nexthops"
- ]:
- if nh != nh_json["ip"]:
- continue
- nh_found = True
-
- if not nh_found:
- errormsg = (
- "Nexthop {} is Missing"
- " for {} route {} in "
- "RIB of router {}\n".format(
- next_hop, protocol, st_rt, dut
- )
- )
- return errormsg
+ # Continue if there are no network advertise
+ if len(advertise_network) == 0:
+ continue
+
+ for advertise_network_dict in advertise_network:
+ if "vrf" in advertise_network_dict:
+ cmd = "{} vrf {} json".format(command, static_route["vrf"])
+ else:
+ cmd = "{} json".format(command)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+ if count == len(next_hop):
+ nh_found = True
else:
- missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(next_hop, st_rt, dut)
+ )
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
- if nh_found:
- logger.info(
- "Found next_hop {} for all routes in RIB"
- " of router {}\n".format(next_hop, dut)
- )
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
- if not found and len(missing_routes) > 0:
- errormsg = (
- "Missing {} route in RIB of router {}, "
- "routes: {} \n".format(addr_type, dut, missing_routes)
- )
- return errormsg
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing {} route in RIB of router {}, "
+ "routes: {} \n".format(addr_type, dut, missing_routes)
+ )
+ return errormsg
+ if found_routes:
logger.info(
"Verified {} routes in router {} RIB, found"
" routes are: {}\n".format(addr_type, dut, found_routes)
)
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.info("Exiting lib API: verify_rib()")
return True