summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py1182
1 files changed, 1182 insertions, 0 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
new file mode 100644
index 0000000000..a2351bf747
--- /dev/null
+++ b/tests/topotests/lib/ospf.py
@@ -0,0 +1,1182 @@
+#
+# Copyright (c) 2020 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+from copy import deepcopy
+import traceback
+from time import sleep
+from lib.topolog import logger
+import ipaddr
+
+
+# Import common_config to use commomnly used APIs
+from lib.common_config import (create_common_configuration,
+ InvalidCLIError, retry,
+ generate_ips,
+ check_address_types,
+ validate_ip_address,
+ run_frr_cmd)
+
+LOGDIR = "/tmp/topotests/"
+TMPDIR = None
+
+################################
+# Configure procs
+################################
+
+def create_router_ospf(
+ tgen, topo, input_dict=None, build=False,
+ load_config=True):
+ """
+ API to configure ospf on router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "ospf": {
+ "router_id": "22.22.22.22",
+ "area": [{ "id":0.0.0.0, "type": "nssa"}]
+ }
+ }
+
+ result = create_router_ospf(tgen, topo, input_dict)
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: create_router_ospf()")
+ result = False
+
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+
+ for router in input_dict.keys():
+ if "ospf" not in input_dict[router]:
+ logger.debug("Router %s: 'ospf' not present in input_dict", router)
+ continue
+
+ result = __create_ospf_global(
+ tgen, input_dict, router, build, load_config)
+ if result is True:
+ ospf_data = input_dict[router]["ospf"]
+
+
+ logger.debug("Exiting lib API: create_router_ospf()")
+ return result
+
+
+def __create_ospf_global(
+ tgen, input_dict, router, build=False,
+ load_config=True):
+ """
+ Helper API to create ospf global configuration.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+
+ Returns
+ -------
+ True or False
+ """
+
+ result = False
+ logger.debug("Entering lib API: __create_ospf_global()")
+ try:
+
+ ospf_data = input_dict[router]["ospf"]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router ospf"]
+ result = create_common_configuration(tgen, router, config_data,
+ "ospf", build,
+ load_config)
+ return result
+
+ config_data = []
+ cmd = "router ospf"
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no ospf router-id")
+ if router_id:
+ config_data.append("ospf router-id {}".format(
+ router_id))
+
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug("Router %s: 'redist_type' not present in "
+ "input_dict", router)
+ else:
+ cmd = "redistribute {}".format(
+ redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type[
+ 'route_map'])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ #area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug("Router %s: 'area id' not present in "
+ "input_dict", router)
+ else:
+ cmd = "area {}".format(area["id"])
+
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
+
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ result = create_common_configuration(tgen, router, config_data,
+ "ospf", build, load_config)
+
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
+ logger.debug("Router %s: 'summary-address' not present in "
+ "input_dict", router)
+ else:
+ cmd = "summary {}/{}".format(summary["prefix"], summary[
+ "mask"])
+
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
+
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
+
+ del_action = summary.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ result = create_common_configuration(tgen, router, config_data,
+ "ospf", build, load_config)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_ospf_global()")
+ return result
+
+
+def create_router_ospf6(
+ tgen, topo, input_dict=None, build=False,
+ load_config=True):
+ """
+ API to configure ospf on router
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "ospf6": {
+ "router_id": "22.22.22.22",
+ }
+ }
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: create_router_ospf()")
+ result = False
+
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+ for router in input_dict.keys():
+ if "ospf" not in input_dict[router]:
+ logger.debug("Router %s: 'ospf' not present in input_dict", router)
+ continue
+
+ result = __create_ospf_global(
+ tgen, input_dict, router, build, load_config)
+
+ logger.debug("Exiting lib API: create_router_ospf()")
+ return result
+
+
+def __create_ospf6_global(
+ tgen, input_dict, router, build=False,
+ load_config=True):
+ """
+ Helper API to create ospf global configuration.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+
+ Returns
+ -------
+ True or False
+ """
+
+ result = False
+ logger.debug("Entering lib API: __create_ospf_global()")
+ try:
+
+ ospf_data = input_dict[router]["ospf6"]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no ipv6 router ospf"]
+ result = create_common_configuration(tgen, router, config_data,
+ "ospf", build,
+ load_config)
+ return result
+
+ config_data = []
+ cmd = "router ospf"
+
+ config_data.append(cmd)
+
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no ospf router-id")
+ if router_id:
+ config_data.append("ospf router-id {}".format(
+ router_id))
+
+ result = create_common_configuration(tgen, router, config_data,
+ "ospf", build, load_config)
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_ospf_global()")
+ return result
+
+def config_ospf_interface (tgen, topo, input_dict=None, build=False,
+ load_config=True):
+ """
+ API to configure ospf on router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+
+ Usage
+ -----
+ r1_ospf_auth = {
+ "r1": {
+ "links": {
+ "r2": {
+ "ospf": {
+ "authentication": 'message-digest',
+ "authentication-key": "ospf",
+ "message-digest-key": "10"
+ }
+ }
+ }
+ }
+ }
+ result = config_ospf_interface(tgen, topo, r1_ospf_auth)
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Enter lib config_ospf_interface")
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ input_dict = deepcopy(input_dict)
+ for router in input_dict.keys():
+ config_data = []
+ for lnk in input_dict[router]['links'].keys():
+ if "ospf" not in input_dict[router]['links'][lnk]:
+ logger.debug("Router %s: ospf configs is not present in"
+ "input_dict, passed input_dict", router,
+ input_dict)
+ continue
+ ospf_data = input_dict[router]['links'][lnk]['ospf']
+ data_ospf_area = ospf_data.setdefault("area", None)
+ data_ospf_auth = ospf_data.setdefault("authentication", None)
+ data_ospf_dr_priority = ospf_data.setdefault("priority", None)
+ data_ospf_cost = ospf_data.setdefault("cost", None)
+
+ try:
+ intf = topo['routers'][router]['links'][lnk]['interface']
+ except KeyError:
+ intf = topo['switches'][router]['links'][lnk]['interface']
+
+ # interface
+ cmd = "interface {}".format(intf)
+
+ config_data.append(cmd)
+ # interface area config
+ if data_ospf_area:
+ cmd = "ip ospf area {}".format(data_ospf_area)
+ config_data.append(cmd)
+ # interface ospf auth
+ if data_ospf_auth:
+ if data_ospf_auth == 'null':
+ cmd = "ip ospf authentication null"
+ elif data_ospf_auth == 'message-digest':
+ cmd = "ip ospf authentication message-digest"
+ else:
+ cmd = "ip ospf authentication"
+
+ if 'del_action' in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "message-digest-key" in ospf_data:
+ cmd = "ip ospf message-digest-key {} md5 {}".format(
+ ospf_data["message-digest-key"],ospf_data[
+ "authentication-key"])
+ if 'del_action' in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "authentication-key" in ospf_data and \
+ "message-digest-key" not in ospf_data:
+ cmd = "ip ospf authentication-key {}".format(ospf_data[
+ "authentication-key"])
+ if 'del_action' in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf dr priority
+ if data_ospf_dr_priority in ospf_data:
+ cmd = "ip ospf priority {}".format(
+ ospf_data["priority"])
+ if 'del_action' in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf cost
+ if data_ospf_cost in ospf_data:
+ cmd = "ip ospf cost {}".format(
+ ospf_data["cost"])
+ if 'del_action' in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if build:
+ return config_data
+ else:
+ result = create_common_configuration(tgen, router, config_data,
+ "interface_config",
+ build=build)
+ logger.debug("Exiting lib API: create_igmp_config()")
+ return result
+
+def clear_ospf(tgen, router):
+ """
+ This API is to clear ospf neighborship by running
+ clear ip ospf interface * command,
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `router`: device under test
+
+ Usage
+ -----
+ clear_ospf(tgen, "r1")
+ """
+
+ logger.debug("Entering lib API: clear_ospf()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ # Clearing OSPF
+ logger.info("Clearing ospf process for router %s..", router)
+
+ run_frr_cmd(rnode, "clear ip ospf interface ")
+
+ logger.debug("Exiting lib API: clear_ospf()")
+
+
+################################
+# Verification procs
+################################
+@retry(attempts=40, wait=2, return_is_str=True)
+def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
+ """
+ This API is to verify ospf neighborship by running
+ show ip ospf neighbour command,
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `lan` : verify neighbors in lan topology
+
+ Usage
+ -----
+ 1. To check FULL neighbors.
+ verify_ospf_neighbor(tgen, topo, dut=dut)
+
+ 2. To check neighbors with their roles.
+ input_dict = {
+ "r0": {
+ "ospf": {
+ "neighbors": {
+ "r1": {
+ "state": "Full",
+ "role": "DR"
+ },
+ "r2": {
+ "state": "Full",
+ "role": "DROther"
+ },
+ "r3": {
+ "state": "Full",
+ "role": "DROther"
+ }
+ }
+ }
+ }
+ }
+ result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+ logger.debug("Entering lib API: verify_ospf_neighbor()")
+ result = False
+ if input_dict:
+ for router, rnode in tgen.routers().iteritems():
+ if 'ospf' not in topo['routers'][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(rnode,
+ "show ip ospf neighbor all json", isjson=True)
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ ospf_data_list = input_dict[router]["ospf"]
+ ospf_nbr_list = ospf_data_list['neighbors']
+
+ for ospf_nbr, nbr_data in ospf_nbr_list.items():
+ data_ip = topo['routers'][ospf_nbr]['links']
+ data_rid = topo['routers'][ospf_nbr]['ospf']['router_id']
+ if ospf_nbr in data_ip:
+ nbr_details = nbr_data[ospf_nbr]
+ elif lan:
+ for switch in topo['switches']:
+ if 'ospf' in topo['switches'][switch]['links'][router]:
+ neighbor_ip = data_ip[switch]['ipv4'].split("/")[0]
+ else:
+ continue
+ else:
+ neighbor_ip = data_ip[router]['ipv4'].split("/")[0]
+
+ nh_state = None
+ neighbor_ip = neighbor_ip.lower()
+ nbr_rid = data_rid
+ try:
+ nh_state = show_ospf_json[nbr_rid][0][
+ 'state'].split('/')[0]
+ intf_state = show_ospf_json[nbr_rid][0][
+ 'state'].split('/')[1]
+ except KeyError:
+ errormsg = "[DUT: {}] OSPF peer {} missing".format(router,
+ nbr_rid)
+ return errormsg
+
+ nbr_state = nbr_data.setdefault("state",None)
+ nbr_role = nbr_data.setdefault("role",None)
+
+ if nbr_state:
+ if nbr_state == nh_state:
+ logger.info("[DUT: {}] OSPF Nbr is {}:{} State {}".format
+ (router, ospf_nbr, nbr_rid, nh_state))
+ result = True
+ else:
+ errormsg = ("[DUT: {}] OSPF is not Converged, neighbor"
+ " state is {}".format(router, nh_state))
+ return errormsg
+ if nbr_role:
+ if nbr_role == intf_state:
+ logger.info("[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
+ router, ospf_nbr, nbr_rid, nbr_role))
+ else:
+ errormsg = ("[DUT: {}] OSPF is not Converged with rid"
+ "{}, role is {}".format(router, nbr_rid, intf_state))
+ return errormsg
+ continue
+ else:
+ for router, rnode in tgen.routers().iteritems():
+ if 'ospf' not in topo['routers'][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(rnode,
+ "show ip ospf neighbor all json", isjson=True)
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ ospf_data_list = topo["routers"][router]["ospf"]
+ ospf_neighbors = ospf_data_list['neighbors']
+ total_peer = 0
+ total_peer = len(ospf_neighbors.keys())
+ no_of_ospf_nbr = 0
+ ospf_nbr_list = ospf_data_list['neighbors']
+ no_of_peer = 0
+ for ospf_nbr, nbr_data in ospf_nbr_list.items():
+ if nbr_data:
+ data_ip = topo['routers'][nbr_data["nbr"]]['links']
+ data_rid = topo['routers'][nbr_data["nbr"]][
+ 'ospf']['router_id']
+ else:
+ data_ip = topo['routers'][ospf_nbr]['links']
+ data_rid = topo['routers'][ospf_nbr]['ospf']['router_id']
+ if ospf_nbr in data_ip:
+ nbr_details = nbr_data[ospf_nbr]
+ elif lan:
+ for switch in topo['switches']:
+ if 'ospf' in topo['switches'][switch]['links'][router]:
+ neighbor_ip = data_ip[switch]['ipv4'].split("/")[0]
+ else:
+ continue
+ else:
+ neighbor_ip = data_ip[router]['ipv4'].split("/")[0]
+
+ nh_state = None
+ neighbor_ip = neighbor_ip.lower()
+ nbr_rid = data_rid
+ try:
+ nh_state = show_ospf_json[nbr_rid][0][
+ 'state'].split('/')[0]
+ except KeyError:
+ errormsg = "[DUT: {}] OSPF peer {} missing,from "\
+ "{} ".format(router,
+ nbr_rid, ospf_nbr)
+ return errormsg
+
+ if nh_state == 'Full':
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("[DUT: {}] OSPF is Converged".format(router))
+ result = True
+ else:
+ errormsg = ("[DUT: {}] OSPF is not Converged".format(router))
+ return errormsg
+
+ logger.debug("Exiting API: verify_ospf_neighbor()")
+ return result
+
+@retry(attempts=21, wait=2, return_is_str=True)
+def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
+ tag=None, metric=None, fib=None):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf route command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `next_hop` : next to be verified
+ * `tag` : tag to be verified
+ * `metric` : metric to be verified
+ * `fib` : True if the route is installed in FIB.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": ip_net,
+ "no_of_ip": 1,
+ "routeType": "N"
+ }
+ ]
+ }
+ }
+
+ result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.info("Entering lib API: verify_ospf_rib()")
+ result = False
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ found_hops = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ logger.info("Checking router %s RIB:", router)
+
+ # Verifying RIB routes
+ command = "show ip ospf route"
+
+ found_routes = []
+ missing_routes = []
+
+ if "static_routes" in input_dict[routerInput] or \
+ "prefix" in input_dict[routerInput]:
+ if "prefix" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["prefix"]
+ else:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+
+ for static_route in static_routes:
+ cmd = "{}".format(command)
+
+ cmd = "{} json".format(cmd)
+
+ ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary ospf_rib_json is not empty
+ if bool(ospf_rib_json) is False:
+ errormsg = "[DUT: {}] No routes found in OSPF route " \
+ "table".format(router)
+ return errormsg
+
+ network = static_route["network"]
+ no_of_ip = static_route.setdefault("no_of_ip", 1)
+ _tag = static_route.setdefault("tag", None)
+ _rtype = static_route.setdefault("routeType", None)
+
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != 'ipv4':
+ continue
+
+ if st_rt in ospf_rib_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if fib and next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ for mnh in range(0, len(ospf_rib_json[st_rt])):
+ if 'fib' in ospf_rib_json[st_rt][
+ mnh]["nexthops"][0]:
+ found_hops.append([rib_r[
+ "ip"] for rib_r in ospf_rib_json[
+ st_rt][mnh]["nexthops"]])
+
+ if found_hops[0]:
+ missing_list_of_nexthops = \
+ set(found_hops[0]).difference(next_hop)
+ additional_nexthops_in_required_nhs = \
+ set(next_hop).difference(found_hops[0])
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Nexthop "
+ "%s is not active for route %s in "
+ "RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt, dut)
+ errormsg = (
+ "Nexthop {} is not active"
+ " for route {} in RIB of router"
+ " {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt, dut))
+ return errormsg
+ else:
+ nh_found = True
+
+ elif next_hop and fib is None:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+ found_hops = [rib_r["ip"] for rib_r in
+ ospf_rib_json[st_rt][
+ "nexthops"]]
+
+ if found_hops:
+ missing_list_of_nexthops = \
+ set(found_hops).difference(next_hop)
+ additional_nexthops_in_required_nhs = \
+ set(next_hop).difference(found_hops)
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"\
+ " %s in RIB of router %s\n", \
+ additional_nexthops_in_required_nhs, \
+ st_rt, dut)
+ errormsg=("Nexthop {} is Missing for "\
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt, dut))
+ return errormsg
+ else:
+ nh_found = True
+ if _rtype:
+ if "routeType" not in ospf_rib_json[
+ st_rt]:
+ errormsg = ("[DUT: {}]: routeType missing"
+ "for route {} in OSPF RIB \n".\
+ format(dut, st_rt))
+ return errormsg
+ elif _rtype != ospf_rib_json[st_rt][
+ "routeType"]:
+ errormsg = ("[DUT: {}]: routeType mismatch"
+ "for route {} in OSPF RIB \n".\
+ format(dut, st_rt))
+ return errormsg
+ else:
+ logger.info("DUT: {}]: Found routeType {}"
+ "for route {}".\
+ format(dut, _rtype, st_rt))
+ if tag:
+ if "tag" not in ospf_rib_json[
+ st_rt]:
+ errormsg = ("[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".\
+ format(dut, st_rt
+ ))
+ return errormsg
+
+ if _tag != ospf_rib_json[
+ st_rt]["tag"]:
+ errormsg = ("[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".\
+ format(dut, _tag, st_rt,
+ ))
+ return errormsg
+
+ if metric is not None:
+ if "type2cost" not in ospf_rib_json[
+ st_rt]:
+ errormsg = ("[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".\
+ format(dut, st_rt))
+ return errormsg
+
+ if metric != ospf_rib_json[
+ st_rt]["type2cost"]:
+ errormsg = ("[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".\
+ format(dut, metric, st_rt,
+ ))
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info("[DUT: {}]: Found next_hop {} for all OSPF"
+ " routes in RIB".format(router, next_hop))
+
+ if len(missing_routes) > 0:
+ errormsg = ("[DUT: {}]: Missing route in RIB, "
+ "routes: {}".\
+ format(dut, missing_routes))
+ return errormsg
+
+ if found_routes:
+ logger.info("[DUT: %s]: Verified routes in RIB, found"
+ " routes are: %s\n", dut, found_routes)
+ result = True
+
+ logger.info("Exiting lib API: verify_ospf_rib()")
+ return result
+
+
+@retry(attempts=10, wait=2, return_is_str=True)
+def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf interface command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * `dut`: device under test
+ * `lan`: if set to true this interface belongs to LAN.
+ * `input_dict` : Input dict data, required when configuring from testcase
+
+ Usage
+ -----
+ input_dict= {
+ 'r0': {
+ 'links':{
+ 's1': {
+ 'ospf':{
+ 'priority':98,
+ 'timerDeadSecs': 4,
+ 'area': '0.0.0.3',
+ 'mcastMemberOspfDesignatedRouters': True,
+ 'mcastMemberOspfAllRouters': True,
+ 'ospfEnabled': True,
+
+ }
+ }
+ }
+ }
+ }
+ result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: verify_ospf_interface()")
+ result = False
+ for router, rnode in tgen.routers().iteritems():
+ if 'ospf' not in topo['routers'][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF interface on router %s:", router)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json",
+ isjson=True)
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ ospf_intf_data = input_dict[router]["links"]
+ for ospf_intf, intf_data in ospf_intf_data.items():
+ intf = topo['routers'][router]['links'][ospf_intf]['interface']
+ if intf in show_ospf_json['interfaces']:
+ for intf_attribute in intf_data['ospf']:
+ if intf_data['ospf'][intf_attribute] == show_ospf_json[
+ 'interfaces'][intf][intf_attribute]:
+ logger.info("[DUT: %s] OSPF interface %s: %s is %s",
+ router, intf, intf_attribute, intf_data['ospf'][
+ intf_attribute])
+ else:
+ errormsg= "[DUT: {}] OSPF interface {}: {} is {}, \
+ Expected is {}".format(router, intf, intf_attribute,
+ intf_data['ospf'][intf_attribute], show_ospf_json[
+ 'interfaces'][intf][intf_attribute])
+ return errormsg
+ result = True
+ logger.debug("Exiting API: verify_ospf_interface()")
+ return result
+
+
+@retry(attempts=11, wait=2, return_is_str=True)
+def verify_ospf_database(tgen, topo, dut, input_dict):
+ """
+ This API is to verify ospf lsa's by running
+ show ip ospf database command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `topo` : next to be verified
+
+ Usage
+ -----
+ input_dict = {
+ "areas": {
+ "0.0.0.0": {
+ "Router Link States": {
+ "100.1.1.0-100.1.1.0": {
+ "LSID": "100.1.1.0",
+ "Advertised router": "100.1.1.0",
+ "LSA Age": 130,
+ "Sequence Number": "80000006",
+ "Checksum": "a703",
+ "Router links": 3
+ }
+ },
+ "Net Link States": {
+ "10.0.0.2-100.1.1.1": {
+ "LSID": "10.0.0.2",
+ "Advertised router": "100.1.1.1",
+ "LSA Age": 137,
+ "Sequence Number": "80000001",
+ "Checksum": "9583"
+ }
+ },
+ },
+ }
+ }
+ result = verify_ospf_database(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ result = False
+ router = dut
+ logger.debug("Entering lib API: verify_ospf_database()")
+
+ if 'ospf' not in topo['routers'][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
+ dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("Verifying OSPF interface on router %s:", dut)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json",
+ isjson=True)
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # for inter and inter lsa's
+ ospf_db_data = input_dict.setdefault("areas", None)
+ ospf_external_lsa = input_dict.setdefault(
+ 'AS External Link States', None)
+ if ospf_db_data:
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if ospf_area in show_ospf_json['areas']:
+ if 'Router Link States' in area_lsa:
+ for lsa in area_lsa['Router Link States']:
+ if lsa in show_ospf_json['areas'][ospf_area][
+ 'Router Link States']:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router "
+ "LSA %s", router, ospf_area, lsa)
+ result = True
+ else:
+ errormsg = \
+ "[DUT: {}] OSPF LSDB area {}: expected" \
+ " Router LSA is {}".format(router, ospf_area, lsa)
+ return errormsg
+ if 'Net Link States' in area_lsa:
+ for lsa in area_lsa['Net Link States']:
+ if lsa in show_ospf_json['areas'][ospf_area][
+ 'Net Link States']:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network "
+ "LSA %s", router, ospf_area, lsa)
+ result = True
+ else:
+ errormsg = \
+ "[DUT: {}] OSPF LSDB area {}: expected" \
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ return errormsg
+ if 'Summary Link States' in area_lsa:
+ for lsa in area_lsa['Summary Link States']:
+ if lsa in show_ospf_json['areas'][ospf_area][
+ 'Summary Link States']:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary "
+ "LSA %s", router, ospf_area, lsa)
+ result = True
+ else:
+ errormsg = \
+ "[DUT: {}] OSPF LSDB area {}: expected" \
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ return errormsg
+ if 'ASBR-Summary Link States' in area_lsa:
+ for lsa in area_lsa['ASBR-Summary Link States']:
+ if lsa in show_ospf_json['areas'][ospf_area][
+ 'ASBR-Summary Link States']:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary "
+ "LSA %s", router, ospf_area, lsa)
+ result = True
+ else:
+ errormsg = \
+ "[DUT: {}] OSPF LSDB area {}: expected" \
+ " ASBR Summary LSA is {}".format(
+ router, ospf_area, lsa)
+ return errormsg
+ if ospf_external_lsa:
+ for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
+ if ospf_ext_lsa in show_ospf_json['AS External Link States']:
+ logger.info(
+ "[DUT: %s] OSPF LSDB:External LSA %s",
+ router, ospf_ext_lsa)
+ result = True
+ else:
+ errormsg = \
+ "[DUT: {}] OSPF LSDB : expected" \
+ " External LSA is {}".format(router, ospf_ext_lsa)
+ return errormsg
+
+ logger.debug("Exiting API: verify_ospf_database()")
+ return result
+
+
+
+@retry(attempts=10, wait=2, return_is_str=True)
+def verify_ospf_summary(tgen, topo, dut, input_dict):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf interface command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+
+ Usage
+ -----
+ input_dict = {
+ "11.0.0.0/8": {
+ "Summary address": "11.0.0.0/8",
+ "Metric-type": "E2",
+ "Metric": 20,
+ "Tag": 0,
+ "External route count": 5
+ }
+ }
+ result = verify_ospf_summary(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: verify_ospf_summary()")
+ result = False
+ router = dut
+
+ logger.info("Verifying OSPF summary on router %s:", router)
+
+ if 'ospf' not in topo['routers'][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
+ router)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
+ isjson=True)
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ ospf_summary_data = input_dict
+ for ospf_summ, summ_data in ospf_summary_data.items():
+ if ospf_summ not in show_ospf_json:
+ continue
+ summary = ospf_summary_data[ospf_summ]['Summary address']
+ if summary in show_ospf_json:
+ for summ in summ_data:
+ if summ_data[summ] == show_ospf_json[summary][summ]:
+ logger.info("[DUT: %s] OSPF summary %s:%s is %s",
+ router, summary, summ, summ_data[summ])
+ result = True
+ else:
+ errormsg = ("[DUT: {}] OSPF summary {}:{} is %s, "
+ "Expected is {}".format(router,summary, summ,
+ show_ospf_json[summary][summ]))
+ return errormsg
+
+ logger.debug("Exiting API: verify_ospf_summary()")
+ return result