diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 1310 |
1 files changed, 789 insertions, 521 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index dc9fe0fcca..beac768905 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -18,7 +18,6 @@ # OF THIS SOFTWARE. # -import traceback import ipaddr import ipaddress import sys @@ -28,9 +27,11 @@ from time import sleep from lib.topolog import logger from lib.topotest import frr_unicode from ipaddress import IPv6Address +import sys + # Import common_config to use commomnly used APIs from lib.common_config import ( - create_common_configuration, + create_common_configurations, InvalidCLIError, retry, generate_ips, @@ -84,32 +85,36 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru topo = topo["routers"] input_dict = deepcopy(input_dict) - for router in input_dict.keys(): - if "ospf" not in input_dict[router]: - logger.debug("Router %s: 'ospf' not present in input_dict", router) - continue - - result = __create_ospf_global( - tgen, input_dict, router, build, load_config) - if result is True: - ospf_data = input_dict[router]["ospf"] + for ospf in ["ospf", "ospf6"]: + config_data_dict = {} - for router in input_dict.keys(): - if "ospf6" not in input_dict[router]: - logger.debug("Router %s: 'ospf6' not present in input_dict", router) - continue + for router in input_dict.keys(): + if ospf not in input_dict[router]: + logger.debug("Router %s: %s not present in input_dict", router, ospf) + continue - result = __create_ospf_global( - tgen, input_dict, router, build, load_config, ospf='ospf6') - if result is True: - ospf_data = input_dict[router]["ospf6"] + config_data = __create_ospf_global( + tgen, input_dict, router, build, load_config, ospf + ) + if config_data: + if router not in config_data_dict: + config_data_dict[router] = config_data + else: + config_data_dict[router].extend(config_data) + try: + result = create_common_configurations( + tgen, config_data_dict, ospf, build, load_config + ) + except InvalidCLIError: + logger.error("create_router_ospf (ipv4)", exc_info=True) + result = False logger.debug("Exiting lib API: create_router_ospf()") return result def __create_ospf_global( - tgen, input_dict, router, build=False, load_config=True, ospf="ospf" + tgen, input_dict, router, build, load_config, ospf ): """ Helper API to create ospf global configuration. @@ -131,12 +136,12 @@ def __create_ospf_global( "links": { "r3": { "ipv6": "2013:13::1/64", - "ospf6": { + "ospf6": { "hello_interval": 1, "dead_interval": 4, "network": "point-to-point" } - } + } }, "ospf6": { "router_id": "1.1.1.1", @@ -151,194 +156,221 @@ def __create_ospf_global( Returns ------- - True or False + list of configuration commands """ - result = False - logger.debug("Entering lib API: __create_ospf_global()") - try: + config_data = [] - ospf_data = input_dict[router][ospf] - del_ospf_action = ospf_data.setdefault("delete", False) - if del_ospf_action: - config_data = ["no router {}".format(ospf)] - result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config - ) - return result + if ospf not in input_dict[router]: + return config_data - config_data = [] - cmd = "router {}".format(ospf) + logger.debug("Entering lib API: __create_ospf_global()") + ospf_data = input_dict[router][ospf] + del_ospf_action = ospf_data.setdefault("delete", False) + if del_ospf_action: + config_data = ["no router {}".format(ospf)] + return config_data + + cmd = "router {}".format(ospf) + + config_data.append(cmd) + + # router id + router_id = ospf_data.setdefault("router_id", None) + del_router_id = ospf_data.setdefault("del_router_id", False) + if del_router_id: + config_data.append("no {} router-id".format(ospf)) + if router_id: + config_data.append("{} router-id {}".format(ospf, router_id)) + + # log-adjacency-changes + log_adj_changes = ospf_data.setdefault("log_adj_changes", None) + del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False) + if del_log_adj_changes: + config_data.append("no log-adjacency-changes detail") + if log_adj_changes: + config_data.append("log-adjacency-changes {}".format( + log_adj_changes)) + + # aggregation timer + aggr_timer = ospf_data.setdefault("aggr_timer", None) + del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False) + if del_aggr_timer: + config_data.append("no aggregation timer") + if aggr_timer: + config_data.append("aggregation timer {}".format( + aggr_timer)) + + # maximum path information + ecmp_data = ospf_data.setdefault("maximum-paths", {}) + if ecmp_data: + cmd = "maximum-paths {}".format(ecmp_data) + del_action = ospf_data.setdefault("del_max_path", False) + if del_action: + cmd = "no maximum-paths" config_data.append(cmd) + # redistribute command + redistribute_data = ospf_data.setdefault("redistribute", {}) + if redistribute_data: + for redistribute in redistribute_data: + if "redist_type" not in redistribute: + logger.debug( + "Router %s: 'redist_type' not present in " "input_dict", router + ) + else: + cmd = "redistribute {}".format(redistribute["redist_type"]) + for red_type in redistribute_data: + if "route_map" in red_type: + cmd = cmd + " route-map {}".format(red_type["route_map"]) + del_action = redistribute.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - # router id - router_id = ospf_data.setdefault("router_id", None) - del_router_id = ospf_data.setdefault("del_router_id", False) - if del_router_id: - config_data.append("no {} router-id".format(ospf)) - if router_id: - config_data.append("{} router-id {}".format(ospf, router_id)) - - # log-adjacency-changes - log_adj_changes = ospf_data.setdefault("log_adj_changes", None) - del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False) - if del_log_adj_changes: - config_data.append("no log-adjacency-changes detail") - if log_adj_changes: - config_data.append("log-adjacency-changes {}".format( - log_adj_changes)) - - # aggregation timer - aggr_timer = ospf_data.setdefault("aggr_timer", None) - del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False) - if del_aggr_timer: - config_data.append("no aggregation timer") - if aggr_timer: - config_data.append("aggregation timer {}".format( - aggr_timer)) - - # maximum path information - ecmp_data = ospf_data.setdefault("maximum-paths", {}) - if ecmp_data: - cmd = "maximum-paths {}".format(ecmp_data) - del_action = ospf_data.setdefault("del_max_path", False) - if del_action: - cmd = "no maximum-paths" - config_data.append(cmd) - - # redistribute command - redistribute_data = ospf_data.setdefault("redistribute", {}) - if redistribute_data: - for redistribute in redistribute_data: - if "redist_type" not in redistribute: - logger.debug( - "Router %s: 'redist_type' not present in " "input_dict", router - ) - else: - cmd = "redistribute {}".format(redistribute["redist_type"]) - for red_type in redistribute_data: - if "route_map" in red_type: - cmd = cmd + " route-map {}".format(red_type["route_map"]) - del_action = redistribute.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # area information + area_data = ospf_data.setdefault("area", {}) + if area_data: + for area in area_data: + if "id" not in area: + logger.debug( + "Router %s: 'area id' not present in " "input_dict", router + ) + else: + cmd = "area {}".format(area["id"]) - # area information - area_data = ospf_data.setdefault("area", {}) - if area_data: - for area in area_data: - if "id" not in area: - logger.debug( - "Router %s: 'area id' not present in " "input_dict", router - ) - else: - cmd = "area {}".format(area["id"]) + if "type" in area: + cmd = cmd + " {}".format(area["type"]) - if "type" in area: - cmd = cmd + " {}".format(area["type"]) + del_action = area.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = area.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + #def route information + def_rte_data = ospf_data.setdefault("default-information", {}) + if def_rte_data: + if "originate" not in def_rte_data: + logger.debug("Router %s: 'originate key' not present in " + "input_dict", router) + else: + cmd = "default-information originate" - #def route information - def_rte_data = ospf_data.setdefault("default-information", {}) - if def_rte_data: - if "originate" not in def_rte_data: - logger.debug("Router %s: 'originate key' not present in " - "input_dict", router) - else: - cmd = "default-information originate" + if "always" in def_rte_data: + cmd = cmd + " always" - if "always" in def_rte_data: - cmd = cmd + " always" + if "metric" in def_rte_data: + cmd = cmd + " metric {}".format(def_rte_data["metric"]) - if "metric" in def_rte_data: - cmd = cmd + " metric {}".format(def_rte_data["metric"]) + if "metric-type" in def_rte_data: + cmd = cmd + " metric-type {}".format(def_rte_data[ + "metric-type"]) - if "metric-type" in def_rte_data: - cmd = cmd + " metric-type {}".format(def_rte_data[ - "metric-type"]) + if "route-map" in def_rte_data: + cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) - if "route-map" in def_rte_data: - cmd = cmd + " route-map {}".format(def_rte_data[ - "route-map"]) + del_action = def_rte_data.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = def_rte_data.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # area interface information for ospf6d only + if ospf == "ospf6": + area_iface = ospf_data.setdefault("neighbors", {}) + if area_iface: + for neighbor in area_iface: + if "area" in area_iface[neighbor]: + iface = input_dict[router]["links"][neighbor]["interface"] + cmd = "interface {} area {}".format( + iface, area_iface[neighbor]["area"] + ) + if area_iface[neighbor].setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - # area interface information for ospf6d only - if ospf == "ospf6": - area_iface = ospf_data.setdefault("neighbors", {}) - if area_iface: - for neighbor in area_iface: - if "area" in area_iface[neighbor]: + try: + if "area" in input_dict[router]['links'][neighbor][ + 'ospf6']: iface = input_dict[router]["links"][neighbor]["interface"] cmd = "interface {} area {}".format( - iface, area_iface[neighbor]["area"] - ) - if area_iface[neighbor].setdefault("delete", False): + iface, input_dict[router]['links'][neighbor][ + 'ospf6']['area']) + if input_dict[router]['links'][neighbor].setdefault( + "delete", False): cmd = "no {}".format(cmd) config_data.append(cmd) + except KeyError: + pass - try: - if "area" in input_dict[router]['links'][neighbor][ - 'ospf6']: - iface = input_dict[router]["links"][neighbor]["interface"] - cmd = "interface {} area {}".format( - iface, input_dict[router]['links'][neighbor][ - 'ospf6']['area']) - if input_dict[router]['links'][neighbor].setdefault( - "delete", False): - cmd = "no {}".format(cmd) - config_data.append(cmd) - except KeyError: - pass + # summary information + summary_data = ospf_data.setdefault("summary-address", {}) + if summary_data: + for summary in summary_data: + if "prefix" not in summary: + logger.debug( + "Router %s: 'summary-address' not present in " "input_dict", + router, + ) + else: + cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) - # summary information - summary_data = ospf_data.setdefault("summary-address", {}) - if summary_data: - for summary in summary_data: - if "prefix" not in summary: - logger.debug( - "Router %s: 'summary-address' not present in " "input_dict", - router, - ) - else: - cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) + _tag = summary.setdefault("tag", None) + if _tag: + cmd = "{} tag {}".format(cmd, _tag) - _tag = summary.setdefault("tag", None) - if _tag: - cmd = "{} tag {}".format(cmd, _tag) + _advertise = summary.setdefault("advertise", True) + if not _advertise: + cmd = "{} no-advertise".format(cmd) - _advertise = summary.setdefault("advertise", True) - if not _advertise: - cmd = "{} no-advertise".format(cmd) + del_action = summary.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = summary.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # ospf gr information + gr_data = ospf_data.setdefault("graceful-restart", {}) + if gr_data: - result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config - ) + if "opaque" in gr_data and gr_data["opaque"]: + cmd = "capability opaque" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - except InvalidCLIError: - # Traceback - errormsg = traceback.format_exc() - logger.error(errormsg) - return errormsg + if "helper-only" in gr_data and not gr_data["helper-only"]: + cmd = "graceful-restart helper-only" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list: + for rtrs in gr_data["helper-only"]: + cmd = "graceful-restart helper-only {}".format(rtrs) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "helper" in gr_data: + if type(gr_data["helper"]) is not list: + gr_data["helper"] = list(gr_data["helper"]) + for helper_role in gr_data["helper"]: + cmd = "graceful-restart helper {}".format(helper_role) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "supported-grace-time" in gr_data: + cmd = "graceful-restart helper supported-grace-time {}".format( + gr_data["supported-grace-time"] + ) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) logger.debug("Exiting lib API: create_ospf_global()") - return result + + return config_data def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True): @@ -373,14 +405,27 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr else: topo = topo["routers"] input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): if "ospf6" not in input_dict[router]: logger.debug("Router %s: 'ospf6' not present in input_dict", router) continue - result = __create_ospf_global( + config_data = __create_ospf_global( tgen, input_dict, router, build, load_config, "ospf6" ) + if config_data: + config_data_dict[router] = config_data + + try: + result = create_common_configurations( + tgen, config_data_dict, "ospf6", build, load_config + ) + except InvalidCLIError: + logger.error("create_router_ospf6", exc_info=True) + result = False logger.debug("Exiting lib API: create_router_ospf6()") return result @@ -420,10 +465,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= True or False """ logger.debug("Enter lib config_ospf_interface") + result = False if not input_dict: input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): config_data = [] for lnk in input_dict[router]["links"].keys(): @@ -502,16 +551,20 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= # interface ospf mtu if data_ospf_mtu: cmd = "ip ospf mtu-ignore" - if 'del_action' in ospf_data: + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if build: return config_data - else: - result = create_common_configuration( - tgen, router, config_data, "interface_config", build=build - ) + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "interface_config", build=build + ) + logger.debug("Exiting lib API: config_ospf_interface()") return result @@ -543,8 +596,7 @@ def clear_ospf(tgen, router, ospf=None): version = "ip" cmd = "clear {} ospf interface".format(version) - logger.info( - "Clearing ospf process on router %s.. using command '%s'", router, cmd) + logger.info("Clearing ospf process on router %s.. using command '%s'", router, cmd) run_frr_cmd(rnode, cmd) logger.debug("Exiting lib API: clear_ospf()") @@ -774,7 +826,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec ################################ # Verification procs ################################ -@retry(retry_timeout=20) +@retry(retry_timeout=50) def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): """ This API is to verify ospf neighborship by running @@ -825,105 +877,133 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): if input_dict: for router, rnode in tgen.routers().items(): - if 'ospf6' not in topo['routers'][router]: + if "ospf6" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF neighborship on router %s:", router) - show_ospf_json = run_frr_cmd(rnode, - "show ipv6 ospf neighbor json", isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ipv6 ospf neighbor json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF6 is not running" return errormsg ospf_data_list = input_dict[router]["ospf6"] - ospf_nbr_list = ospf_data_list['neighbors'] + ospf_nbr_list = ospf_data_list["neighbors"] for ospf_nbr, nbr_data in ospf_nbr_list.items(): - data_ip = data_rid = topo['routers'][ospf_nbr]['ospf6']['router_id'] + + try: + data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"] + except KeyError: + data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][ + "router_id" + ] + if ospf_nbr in data_ip: nbr_details = nbr_data[ospf_nbr] elif lan: - for switch in topo['switches']: - if 'ospf6' in topo['switches'][switch]['links'][router]: + for switch in topo["switches"]: + if "ospf6" in topo["switches"][switch]["links"][router]: neighbor_ip = data_ip else: continue else: - neighbor_ip = data_ip[router]['ipv6'].split("/")[0] + neighbor_ip = data_ip[router]["ipv6"].split("/")[0] nh_state = None neighbor_ip = neighbor_ip.lower() nbr_rid = data_rid - get_index_val = dict((d['neighborId'], dict( \ - d, index=index)) for (index, d) in enumerate( \ - show_ospf_json['neighbors'])) + get_index_val = dict( + (d["neighborId"], dict(d, index=index)) + for (index, d) in enumerate(show_ospf_json["neighbors"]) + ) try: - nh_state = get_index_val.get(neighbor_ip)['state'] - intf_state = get_index_val.get(neighbor_ip)['ifState'] + nh_state = get_index_val.get(neighbor_ip)["state"] + intf_state = get_index_val.get(neighbor_ip)["ifState"] except TypeError: - errormsg = "[DUT: {}] OSPF peer {} missing,from "\ - "{} ".format(router, - nbr_rid, ospf_nbr) + errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format( + router, nbr_rid, ospf_nbr + ) return errormsg - nbr_state = nbr_data.setdefault("state",None) - nbr_role = nbr_data.setdefault("role",None) + nbr_state = nbr_data.setdefault("state", None) + nbr_role = nbr_data.setdefault("role", None) if nbr_state: if nbr_state == nh_state: - logger.info("[DUT: {}] OSPF6 Nbr is {}:{} State {}".format - (router, ospf_nbr, nbr_rid, nh_state)) + logger.info( + "[DUT: {}] OSPF6 Nbr is {}:{} State {}".format( + router, ospf_nbr, nbr_rid, nh_state + ) + ) result = True else: - errormsg = ("[DUT: {}] OSPF6 is not Converged, neighbor" - " state is {} , Expected state is {}".format(router, - nh_state, nbr_state)) + errormsg = ( + "[DUT: {}] OSPF6 is not Converged, neighbor" + " state is {} , Expected state is {}".format( + router, nh_state, nbr_state + ) + ) return errormsg if nbr_role: if nbr_role == intf_state: - logger.info("[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format( - router, ospf_nbr, nbr_rid, nbr_role)) + logger.info( + "[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format( + router, ospf_nbr, nbr_rid, nbr_role + ) + ) else: - errormsg = ("[DUT: {}] OSPF6 is not Converged with rid" - "{}, role is {}, Expected role is {}".format(router, - nbr_rid, intf_state, nbr_role)) + errormsg = ( + "[DUT: {}] OSPF6 is not Converged with rid" + "{}, role is {}, Expected role is {}".format( + router, nbr_rid, intf_state, nbr_role + ) + ) return errormsg continue else: for router, rnode in tgen.routers().items(): - if 'ospf6' not in topo['routers'][router]: + if "ospf6" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF6 neighborship on router %s:", router) - show_ospf_json = run_frr_cmd(rnode, - "show ipv6 ospf neighbor json", isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ipv6 ospf neighbor json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF6 is not running" return errormsg ospf_data_list = topo["routers"][router]["ospf6"] - ospf_neighbors = ospf_data_list['neighbors'] + ospf_neighbors = ospf_data_list["neighbors"] total_peer = 0 total_peer = len(ospf_neighbors.keys()) no_of_ospf_nbr = 0 - ospf_nbr_list = ospf_data_list['neighbors'] + ospf_nbr_list = ospf_data_list["neighbors"] no_of_peer = 0 for ospf_nbr, nbr_data in ospf_nbr_list.items(): - data_ip = data_rid = topo['routers'][ospf_nbr]['ospf6']['router_id'] + try: + data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"] + except KeyError: + data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][ + "router_id" + ] + if ospf_nbr in data_ip: nbr_details = nbr_data[ospf_nbr] elif lan: - for switch in topo['switches']: - if 'ospf6' in topo['switches'][switch]['links'][router]: + for switch in topo["switches"]: + if "ospf6" in topo["switches"][switch]["links"][router]: neighbor_ip = data_ip else: continue @@ -933,26 +1013,27 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): nh_state = None neighbor_ip = neighbor_ip.lower() nbr_rid = data_rid - get_index_val = dict((d['neighborId'], dict( \ - d, index=index)) for (index, d) in enumerate( \ - show_ospf_json['neighbors'])) + get_index_val = dict( + (d["neighborId"], dict(d, index=index)) + for (index, d) in enumerate(show_ospf_json["neighbors"]) + ) try: - nh_state = get_index_val.get(neighbor_ip)['state'] - intf_state = get_index_val.get(neighbor_ip)['ifState'] + nh_state = get_index_val.get(neighbor_ip)["state"] + intf_state = get_index_val.get(neighbor_ip)["ifState"] except TypeError: - errormsg = "[DUT: {}] OSPF peer {} missing,from "\ - "{} ".format(router, - nbr_rid, ospf_nbr) + errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format( + router, nbr_rid, ospf_nbr + ) return errormsg - if nh_state == 'Full': + if nh_state == "Full": no_of_peer += 1 if no_of_peer == total_peer: logger.info("[DUT: {}] OSPF6 is Converged".format(router)) result = True else: - errormsg = ("[DUT: {}] OSPF6 is not Converged".format(router)) + errormsg = "[DUT: {}] OSPF6 is not Converged".format(router) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1491,7 +1572,7 @@ def verify_ospf_database(tgen, topo, dut, input_dict, expected=True): @retry(retry_timeout=20) -def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True): +def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1502,7 +1583,6 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True): * `topo` : topology descriptions * `dut`: device under test * `input_dict` : Input dict data, required when configuring from testcase - * `expected` : expected results from API, by-default True Usage ----- @@ -1522,18 +1602,30 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True): True or False (Error Message) """ - logger.debug("Entering lib API: verify_ospf_summary()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False router = dut logger.info("Verifying OSPF summary on router %s:", router) - if "ospf" not in topo["routers"][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router) - return errormsg - rnode = tgen.routers()[dut] - show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", isjson=True) + + if ospf: + if 'ospf6' not in topo['routers'][dut]: + errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format( + router) + return errormsg + + show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json", + isjson=True) + else: + if 'ospf' not in topo['routers'][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format( + router) + return errormsg + + show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", + isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -1542,35 +1634,31 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True): # To find neighbor ip type ospf_summary_data = input_dict + + if ospf: + show_ospf_json = show_ospf_json['default'] + for ospf_summ, summ_data in ospf_summary_data.items(): if ospf_summ not in show_ospf_json: continue - summary = ospf_summary_data[ospf_summ]["Summary address"] + summary = ospf_summary_data[ospf_summ]['Summary address'] + if summary in show_ospf_json: for summ in summ_data: if summ_data[summ] == show_ospf_json[summary][summ]: - logger.info( - "[DUT: %s] OSPF summary %s:%s is %s", - router, - summary, - summ, - summ_data[summ], - ) + logger.info("[DUT: %s] OSPF summary %s:%s is %s", + router, summary, summ, summ_data[summ]) result = True else: - errormsg = ( - "[DUT: {}] OSPF summary {}:{} is %s, " - "Expected is {}".format( - router, summary, summ, show_ospf_json[summary][summ] - ) - ) + errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, " + "Expected is {}".format(router, summary, summ,show_ospf_json[ + summary][summ], summ_data[summ] )) return errormsg - logger.debug("Exiting API: verify_ospf_summary()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result - @retry(retry_timeout=30) def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None): @@ -1627,31 +1715,34 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, found_routes = [] missing_routes = [] - if "static_routes" in input_dict[routerInput] or \ - "prefix" in input_dict[routerInput]: + if ( + "static_routes" in input_dict[routerInput] + or "prefix" in input_dict[routerInput] + ): if "prefix" in input_dict[routerInput]: static_routes = input_dict[routerInput]["prefix"] else: static_routes = input_dict[routerInput]["static_routes"] - for static_route in static_routes: cmd = "{}".format(command) cmd = "{} json".format(cmd) - ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True) + ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True) # Fix for PR 2644182 try: - ospf_rib_json = ospf_rib_json['routes'] + ospf_rib_json = ospf_rib_json["routes"] except KeyError: pass # Verifying output dictionary ospf_rib_json is not empty if bool(ospf_rib_json) is False: - errormsg = "[DUT: {}] No routes found in OSPF6 route " \ + errormsg = ( + "[DUT: {}] No routes found in OSPF6 route " "table".format(router) + ) return errormsg network = static_route["network"] @@ -1659,7 +1750,6 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, _tag = static_route.setdefault("tag", None) _rtype = static_route.setdefault("routeType", None) - # Generating IPs for verification ip_list = generate_ips(network, no_of_ip) st_found = False @@ -1668,7 +1758,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) _addr_type = validate_ip_address(st_rt) - if _addr_type != 'ipv6': + if _addr_type != "ipv6": continue if st_rt in ospf_rib_json: @@ -1681,17 +1771,26 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, next_hop = [next_hop] for mnh in range(0, len(ospf_rib_json[st_rt])): - if 'fib' in ospf_rib_json[st_rt][ - mnh]["nextHops"][0]: - found_hops.append([rib_r[ - "ip"] for rib_r in ospf_rib_json[ - st_rt][mnh]["nextHops"]]) + if ( + "fib" + in ospf_rib_json[st_rt][mnh]["nextHops"][0] + ): + found_hops.append( + [ + rib_r["ip"] + for rib_r in ospf_rib_json[st_rt][mnh][ + "nextHops" + ] + ] + ) if found_hops[0]: - missing_list_of_nexthops = \ - set(found_hops[0]).difference(next_hop) - additional_nexthops_in_required_nhs = \ - set(next_hop).difference(found_hops[0]) + missing_list_of_nexthops = set( + found_hops[0] + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops[0]) if additional_nexthops_in_required_nhs: logger.info( @@ -1699,13 +1798,18 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, "%s is not active for route %s in " "RIB of router %s\n", additional_nexthops_in_required_nhs, - st_rt, dut) + st_rt, + dut, + ) errormsg = ( "Nexthop {} is not active" " for route {} in RIB of router" " {}\n".format( - additional_nexthops_in_required_nhs, - st_rt, dut)) + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) return errormsg else: nh_found = True @@ -1713,98 +1817,118 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, elif next_hop and fib is None: if type(next_hop) is not list: next_hop = [next_hop] - found_hops = [rib_r['nextHop'] for rib_r in - ospf_rib_json[st_rt][ - "nextHops"]] + found_hops = [ + rib_r["nextHop"] + for rib_r in ospf_rib_json[st_rt]["nextHops"] + ] if found_hops: - missing_list_of_nexthops = \ - set(found_hops).difference(next_hop) - additional_nexthops_in_required_nhs = \ - set(next_hop).difference(found_hops) + missing_list_of_nexthops = set( + found_hops + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops) if additional_nexthops_in_required_nhs: logger.info( - "Missing nexthop %s for route"\ - " %s in RIB of router %s\n", \ - additional_nexthops_in_required_nhs, \ - st_rt, dut) - errormsg=("Nexthop {} is Missing for "\ - "route {} in RIB of router {}\n".format( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", additional_nexthops_in_required_nhs, - st_rt, dut)) + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) return errormsg else: nh_found = True if _rtype: - if "destinationType" not in ospf_rib_json[ - st_rt]: - errormsg = ("[DUT: {}]: destinationType missing" - "for route {} in OSPF RIB \n".\ - format(dut, st_rt)) + if "destinationType" not in ospf_rib_json[st_rt]: + errormsg = ( + "[DUT: {}]: destinationType missing" + "for route {} in OSPF RIB \n".format(dut, st_rt) + ) return errormsg - elif _rtype != ospf_rib_json[st_rt][ - "destinationType"]: - errormsg = ("[DUT: {}]: destinationType mismatch" - "for route {} in OSPF RIB \n".\ - format(dut, st_rt)) + elif _rtype != ospf_rib_json[st_rt]["destinationType"]: + errormsg = ( + "[DUT: {}]: destinationType mismatch" + "for route {} in OSPF RIB \n".format(dut, st_rt) + ) return errormsg else: - logger.info("DUT: {}]: Found destinationType {}" - "for route {}".\ - format(dut, _rtype, st_rt)) + logger.info( + "DUT: {}]: Found destinationType {}" + "for route {}".format(dut, _rtype, st_rt) + ) if tag: - if "tag" not in ospf_rib_json[ - st_rt]: - errormsg = ("[DUT: {}]: tag is not" - " present for" - " route {} in RIB \n".\ - format(dut, st_rt - )) + if "tag" not in ospf_rib_json[st_rt]: + errormsg = ( + "[DUT: {}]: tag is not" + " present for" + " route {} in RIB \n".format(dut, st_rt) + ) return errormsg - if _tag != ospf_rib_json[ - st_rt]["tag"]: - errormsg = ("[DUT: {}]: tag value {}" - " is not matched for" - " route {} in RIB \n".\ - format(dut, _tag, st_rt, - )) + if _tag != ospf_rib_json[st_rt]["tag"]: + errormsg = ( + "[DUT: {}]: tag value {}" + " is not matched for" + " route {} in RIB \n".format( + dut, + _tag, + st_rt, + ) + ) return errormsg if metric is not None: - if "type2cost" not in ospf_rib_json[ - st_rt]: - errormsg = ("[DUT: {}]: metric is" - " not present for" - " route {} in RIB \n".\ - format(dut, st_rt)) + if "type2cost" not in ospf_rib_json[st_rt]: + errormsg = ( + "[DUT: {}]: metric is" + " not present for" + " route {} in RIB \n".format(dut, st_rt) + ) return errormsg - if metric != ospf_rib_json[ - st_rt]["type2cost"]: - errormsg = ("[DUT: {}]: metric value " - "{} is not matched for " - "route {} in RIB \n".\ - format(dut, metric, st_rt, - )) + if metric != ospf_rib_json[st_rt]["type2cost"]: + errormsg = ( + "[DUT: {}]: metric value " + "{} is not matched for " + "route {} in RIB \n".format( + dut, + metric, + st_rt, + ) + ) return errormsg else: missing_routes.append(st_rt) if nh_found: - logger.info("[DUT: {}]: Found next_hop {} for all OSPF" - " routes in RIB".format(router, next_hop)) + logger.info( + "[DUT: {}]: Found next_hop {} for all OSPF" + " routes in RIB".format(router, next_hop) + ) if len(missing_routes) > 0: - errormsg = ("[DUT: {}]: Missing route in RIB, " - "routes: {}".\ - format(dut, missing_routes)) + errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format( + dut, missing_routes + ) return errormsg if found_routes: - logger.info("[DUT: %s]: Verified routes in RIB, found" - " routes are: %s\n", dut, found_routes) + logger.info( + "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n", + dut, + found_routes, + ) result = True logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1855,15 +1979,16 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None): result = False for router, rnode in tgen.routers().iteritems(): - if 'ospf6' not in topo['routers'][router]: + if "ospf6" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF interface on router %s:", router) - show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf interface json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ipv6 ospf interface json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -1873,32 +1998,49 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None): # To find neighbor ip type ospf_intf_data = input_dict[router]["links"] for ospf_intf, intf_data in ospf_intf_data.items(): - intf = topo['routers'][router]['links'][ospf_intf]['interface'] - if intf in show_ospf_json: - for intf_attribute in intf_data['ospf6']: - if intf_data['ospf6'][intf_attribute] is not list: - if intf_data['ospf6'][intf_attribute] == show_ospf_json[ - intf][intf_attribute]: - logger.info("[DUT: %s] OSPF6 interface %s: %s is %s", - router, intf, intf_attribute, intf_data['ospf6'][ - intf_attribute]) - elif intf_data['ospf6'][intf_attribute] is list: + intf = topo["routers"][router]["links"][ospf_intf]["interface"] + if intf in show_ospf_json: + for intf_attribute in intf_data["ospf6"]: + if intf_data["ospf6"][intf_attribute] is not list: + if ( + intf_data["ospf6"][intf_attribute] + == show_ospf_json[intf][intf_attribute] + ): + logger.info( + "[DUT: %s] OSPF6 interface %s: %s is %s", + router, + intf, + intf_attribute, + intf_data["ospf6"][intf_attribute], + ) + elif intf_data["ospf6"][intf_attribute] is list: for addr_list in len(show_ospf_json[intf][intf_attribute]): - if show_ospf_json[intf][intf_attribute][addr_list][ - 'address'].split('/')[0] == intf_data['ospf6'][ - 'internetAddress'][0]['address']: - break + if ( + show_ospf_json[intf][intf_attribute][addr_list][ + "address" + ].split("/")[0] + == intf_data["ospf6"]["internetAddress"][0]["address"] + ): + break else: - errormsg= "[DUT: {}] OSPF6 interface {}: {} is {}, \ - Expected is {}".format(router, intf, intf_attribute, - intf_data['ospf6'][intf_attribute], intf_data['ospf6'][ - intf_attribute]) + errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \ + Expected is {}".format( + router, + intf, + intf_attribute, + intf_data["ospf6"][intf_attribute], + intf_data["ospf6"][intf_attribute], + ) return errormsg else: - errormsg= "[DUT: {}] OSPF6 interface {}: {} is {}, \ - Expected is {}".format(router, intf, intf_attribute, - intf_data['ospf6'][intf_attribute], intf_data['ospf6'][ - intf_attribute]) + errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \ + Expected is {}".format( + router, + intf, + intf_attribute, + intf_data["ospf6"][intf_attribute], + intf_data["ospf6"][intf_attribute], + ) return errormsg result = True logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1956,16 +2098,14 @@ def verify_ospf6_database(tgen, topo, dut, input_dict): router = dut logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - dut) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut) return errormsg rnode = tgen.routers()[dut] logger.info("Verifying OSPF interface on router %s:", dut) - show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", - isjson=True) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" @@ -1973,167 +2113,209 @@ def verify_ospf6_database(tgen, topo, dut, input_dict): # for inter and inter lsa's ospf_db_data = input_dict.setdefault("areas", None) - ospf_external_lsa = input_dict.setdefault( - 'asExternalLinkStates', None) + ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None) if ospf_db_data: - for ospf_area, area_lsa in ospf_db_data.items(): - if ospf_area in show_ospf_json['areas']: - if 'routerLinkStates' in area_lsa: - for lsa in area_lsa['routerLinkStates']: - for rtrlsa in show_ospf_json['areas'][ospf_area][ - 'routerLinkStates']: - if lsa['lsaId'] == rtrlsa['lsaId'] and \ - lsa['advertisedRouter'] == rtrlsa[ - 'advertisedRouter']: - result = True - break - if result: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Router " - "LSA %s", router, ospf_area, lsa) + for ospf_area, area_lsa in ospf_db_data.items(): + if ospf_area in show_ospf_json["areas"]: + if "routerLinkStates" in area_lsa: + for lsa in area_lsa["routerLinkStates"]: + for rtrlsa in show_ospf_json["areas"][ospf_area][ + "routerLinkStates" + ]: + if ( + lsa["lsaId"] == rtrlsa["lsaId"] + and lsa["advertisedRouter"] + == rtrlsa["advertisedRouter"] + ): + result = True break - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Router LSA is {}".format(router, ospf_area, lsa) - return errormsg + ) + return errormsg - if 'networkLinkStates' in area_lsa: - for lsa in area_lsa['networkLinkStates']: - for netlsa in show_ospf_json['areas'][ospf_area][ - 'networkLinkStates']: - if lsa in show_ospf_json['areas'][ospf_area][ - 'networkLinkStates']: - if lsa['lsaId'] == netlsa['lsaId'] and \ - lsa['advertisedRouter'] == netlsa[ - 'advertisedRouter']: - result = True - break - if result: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Network " - "LSA %s", router, ospf_area, lsa) - break - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + if "networkLinkStates" in area_lsa: + for lsa in area_lsa["networkLinkStates"]: + for netlsa in show_ospf_json["areas"][ospf_area][ + "networkLinkStates" + ]: + if ( + lsa + in show_ospf_json["areas"][ospf_area][ + "networkLinkStates" + ] + ): + if ( + lsa["lsaId"] == netlsa["lsaId"] + and lsa["advertisedRouter"] + == netlsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Network LSA is {}".format(router, ospf_area, lsa) - return errormsg + ) + return errormsg - if 'summaryLinkStates' in area_lsa: - for lsa in area_lsa['summaryLinkStates']: - for t3lsa in show_ospf_json['areas'][ospf_area][ - 'summaryLinkStates']: - if lsa['lsaId'] == t3lsa['lsaId'] and \ - lsa['advertisedRouter'] == t3lsa[ - 'advertisedRouter']: - result = True - break - if result: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Summary " - "LSA %s", router, ospf_area, lsa) + if "summaryLinkStates" in area_lsa: + for lsa in area_lsa["summaryLinkStates"]: + for t3lsa in show_ospf_json["areas"][ospf_area][ + "summaryLinkStates" + ]: + if ( + lsa["lsaId"] == t3lsa["lsaId"] + and lsa["advertisedRouter"] == t3lsa["advertisedRouter"] + ): + result = True break - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Summary LSA is {}".format(router, ospf_area, lsa) - return errormsg + ) + return errormsg - if 'nssaExternalLinkStates' in area_lsa: - for lsa in area_lsa['nssaExternalLinkStates']: - for t7lsa in show_ospf_json['areas'][ospf_area][ - 'nssaExternalLinkStates']: - if lsa['lsaId'] == t7lsa['lsaId'] and \ - lsa['advertisedRouter'] == t7lsa[ - 'advertisedRouter']: - result = True - break - if result: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Type7 " - "LSA %s", router, ospf_area, lsa) + if "nssaExternalLinkStates" in area_lsa: + for lsa in area_lsa["nssaExternalLinkStates"]: + for t7lsa in show_ospf_json["areas"][ospf_area][ + "nssaExternalLinkStates" + ]: + if ( + lsa["lsaId"] == t7lsa["lsaId"] + and lsa["advertisedRouter"] == t7lsa["advertisedRouter"] + ): + result = True break - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Type7 LSA is {}".format(router, ospf_area, lsa) - return errormsg + ) + return errormsg - if 'asbrSummaryLinkStates' in area_lsa: - for lsa in area_lsa['asbrSummaryLinkStates']: - for t4lsa in show_ospf_json['areas'][ospf_area][ - 'asbrSummaryLinkStates']: - if lsa['lsaId'] == t4lsa['lsaId'] and \ - lsa['advertisedRouter'] == t4lsa[ - 'advertisedRouter']: - result = True - break - if result: - logger.info( - "[DUT: %s] OSPF LSDB area %s:ASBR Summary " - "LSA %s", router, ospf_area, lsa) + if "asbrSummaryLinkStates" in area_lsa: + for lsa in area_lsa["asbrSummaryLinkStates"]: + for t4lsa in show_ospf_json["areas"][ospf_area][ + "asbrSummaryLinkStates" + ]: + if ( + lsa["lsaId"] == t4lsa["lsaId"] + and lsa["advertisedRouter"] == t4lsa["advertisedRouter"] + ): result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ - " ASBR Summary LSA is {}".format( - router, ospf_area, lsa) - return errormsg + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " ASBR Summary LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg - if 'linkLocalOpaqueLsa' in area_lsa: - for lsa in area_lsa['linkLocalOpaqueLsa']: - try: - for lnklsa in show_ospf_json['areas'][ospf_area][ - 'linkLocalOpaqueLsa']: - if lsa['lsaId'] in lnklsa['lsaId'] and \ - 'linkLocalOpaqueLsa' in show_ospf_json[ - 'areas'][ospf_area]: - logger.info(( - "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA" - "%s", ospf_area, lsa)) - result = True - else: - errormsg = ("[DUT: FRR] OSPF LSDB area: {} " - "expected Opaque-LSA is {}, Found is {}".format( - ospf_area, lsa, show_ospf_json)) - raise ValueError (errormsg) - return errormsg - except KeyError: - errormsg = ("[DUT: FRR] linkLocalOpaqueLsa Not " - "present") - return errormsg + if "linkLocalOpaqueLsa" in area_lsa: + for lsa in area_lsa["linkLocalOpaqueLsa"]: + try: + for lnklsa in show_ospf_json["areas"][ospf_area][ + "linkLocalOpaqueLsa" + ]: + if ( + lsa["lsaId"] in lnklsa["lsaId"] + and "linkLocalOpaqueLsa" + in show_ospf_json["areas"][ospf_area] + ): + logger.info( + ( + "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA" + "%s", + ospf_area, + lsa, + ) + ) + result = True + else: + errormsg = ( + "[DUT: FRR] OSPF LSDB area: {} " + "expected Opaque-LSA is {}, Found is {}".format( + ospf_area, lsa, show_ospf_json + ) + ) + raise ValueError(errormsg) + return errormsg + except KeyError: + errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present" + return errormsg if ospf_external_lsa: - for lsa in ospf_external_lsa: - try: - for t5lsa in show_ospf_json['asExternalLinkStates']: - if lsa['lsaId'] == t5lsa['lsaId'] and \ - lsa['advertisedRouter'] == t5lsa[ - 'advertisedRouter']: - result = True - break - except KeyError: - result = False - if result: - logger.info( - "[DUT: %s] OSPF LSDB:External LSA %s", - router, lsa) - result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB : expected" \ - " External LSA is {}".format(router, lsa) - return errormsg + for lsa in ospf_external_lsa: + try: + for t5lsa in show_ospf_json["asExternalLinkStates"]: + if ( + lsa["lsaId"] == t5lsa["lsaId"] + and lsa["advertisedRouter"] == t5lsa["advertisedRouter"] + ): + result = True + break + except KeyError: + result = False + if result: + logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB : expected" + " External LSA is {}".format(router, lsa) + ) + return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result - -def config_ospf6_interface (tgen, topo, input_dict=None, build=False, - load_config=True): +def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. @@ -2172,6 +2354,9 @@ def config_ospf6_interface (tgen, topo, input_dict=None, build=False, input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): config_data = [] for lnk in input_dict[router]['links'].keys(): @@ -2180,17 +2365,17 @@ def config_ospf6_interface (tgen, topo, input_dict=None, build=False, "input_dict, passed input_dict %s", router, str(input_dict)) continue - ospf_data = input_dict[router]['links'][lnk]['ospf6'] + ospf_data = input_dict[router]["links"][lnk]["ospf6"] data_ospf_area = ospf_data.setdefault("area", None) - data_ospf_auth = ospf_data.setdefault("authentication", None) + data_ospf_auth = ospf_data.setdefault("hash-algo", None) data_ospf_dr_priority = ospf_data.setdefault("priority", None) data_ospf_cost = ospf_data.setdefault("cost", None) data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None) try: - intf = topo['routers'][router]['links'][lnk]['interface'] + intf = topo["routers"][router]["links"][lnk]["interface"] except KeyError: - intf = topo['switches'][router]['links'][lnk]['interface'] + intf = topo["switches"][router]["links"][lnk]["interface"] # interface cmd = "interface {}".format(intf) @@ -2201,34 +2386,117 @@ def config_ospf6_interface (tgen, topo, input_dict=None, build=False, cmd = "ipv6 ospf area {}".format(data_ospf_area) config_data.append(cmd) + # interface ospf auth + if data_ospf_auth: + cmd = "ipv6 ospf6 authentication" + + if "del_action" in ospf_data: + cmd = "no {}".format(cmd) + + if "hash-algo" in ospf_data: + cmd = "{} key-id {} hash-algo {} key {}".format( + cmd, + ospf_data["key-id"], + ospf_data["hash-algo"], + ospf_data["key"], + ) + if "del_action" in ospf_data: + cmd = "no {}".format(cmd) + config_data.append(cmd) + # interface ospf dr priority if data_ospf_dr_priority: - cmd = "ipv6 ospf priority {}".format( - ospf_data["priority"]) - if 'del_action' in ospf_data: + cmd = "ipv6 ospf priority {}".format(ospf_data["priority"]) + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) # interface ospf cost if data_ospf_cost: - cmd = "ipv6 ospf cost {}".format( - ospf_data["cost"]) - if 'del_action' in ospf_data: + cmd = "ipv6 ospf cost {}".format(ospf_data["cost"]) + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) # interface ospf mtu if data_ospf_mtu: cmd = "ipv6 ospf mtu-ignore" - if 'del_action' in ospf_data: + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if build: return config_data + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "interface_config", build=build + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + +@retry(retry_timeout=20) +def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): + """ + This API is used to vreify gr helper using command + show ip ospf graceful-restart helper + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : topology descriptions + * 'dut' : router + * 'input_dict' - values to be verified + + Usage: + ------- + input_dict = { + "helperSupport":"Disabled", + "strictLsaCheck":"Enabled", + "restartSupoort":"Planned and Unplanned Restarts", + "supportedGracePeriod":1800 + } + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + + """ + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + result = False + + if 'ospf' not in topo['routers'][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format( + dut) + return errormsg + + rnode = tgen.routers()[dut] + logger.info("Verifying OSPF GR details on router %s:", dut) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json", + isjson=True) + + # Verifying output dictionary show_ospf_json is empty or not + if not bool(show_ospf_json): + errormsg = "OSPF is not running" + raise ValueError (errormsg) + return errormsg + + for ospf_gr, gr_data in input_dict.items(): + try: + if input_dict[ospf_gr] == show_ospf_json[ospf_gr]: + logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr, + show_ospf_json[ospf_gr]) + result = True else: - result = create_common_configuration(tgen, router, config_data, - "interface_config", - build=build) + errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " + "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[ + ospf_gr])) + raise ValueError (errormsg) + return errormsg + + except KeyError: + errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)) + return errormsg + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result |
