diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 630 |
1 files changed, 400 insertions, 230 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 40da7c8fbe..c425e121af 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -18,37 +18,28 @@ # OF THIS SOFTWARE. # -import traceback -import ipaddr import ipaddress import sys - from copy import deepcopy -from time import sleep -from lib.topolog import logger -from lib.topotest import frr_unicode -from ipaddress import IPv6Address # Import common_config to use commomnly used APIs from lib.common_config import ( - create_common_configuration, + create_common_configurations, InvalidCLIError, - retry, generate_ips, - check_address_types, - validate_ip_address, + retry, run_frr_cmd, + validate_ip_address, ) - -LOGDIR = "/tmp/topotests/" -TMPDIR = None +from lib.topolog import logger +from lib.topotest import frr_unicode ################################ # Configure procs ################################ -def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True): +def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. @@ -79,39 +70,44 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru logger.debug("Entering lib API: create_router_ospf()") result = False + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: topo = topo["routers"] input_dict = deepcopy(input_dict) - for router in input_dict.keys(): - if "ospf" not in input_dict[router]: - logger.debug("Router %s: 'ospf' not present in input_dict", router) - continue - - result = __create_ospf_global(tgen, input_dict, router, build, load_config) - if result is True: - ospf_data = input_dict[router]["ospf"] + for ospf in ["ospf", "ospf6"]: + config_data_dict = {} - for router in input_dict.keys(): - if "ospf6" not in input_dict[router]: - logger.debug("Router %s: 'ospf6' not present in input_dict", router) - continue + for router in input_dict.keys(): + if ospf not in input_dict[router]: + logger.debug("Router %s: %s not present in input_dict", router, ospf) + continue - result = __create_ospf_global( - tgen, input_dict, router, build, load_config, ospf="ospf6" - ) - if result is True: - ospf_data = input_dict[router]["ospf6"] + config_data = __create_ospf_global( + tgen, input_dict, router, build, load_config, ospf + ) + if config_data: + if router not in config_data_dict: + config_data_dict[router] = config_data + else: + config_data_dict[router].extend(config_data) + try: + result = create_common_configurations( + tgen, config_data_dict, ospf, build, load_config + ) + except InvalidCLIError: + logger.error("create_router_ospf (ipv4)", exc_info=True) + result = False logger.debug("Exiting lib API: create_router_ospf()") return result -def __create_ospf_global( - tgen, input_dict, router, build=False, load_config=True, ospf="ospf" -): +def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf): """ Helper API to create ospf global configuration. @@ -132,12 +128,12 @@ def __create_ospf_global( "links": { "r3": { "ipv6": "2013:13::1/64", - "ospf6": { + "ospf6": { "hello_interval": 1, "dead_interval": 4, "network": "point-to-point" } - } + } }, "ospf6": { "router_id": "1.1.1.1", @@ -152,193 +148,224 @@ def __create_ospf_global( Returns ------- - True or False + list of configuration commands """ - result = False - logger.debug("Entering lib API: __create_ospf_global()") - try: + config_data = [] - ospf_data = input_dict[router][ospf] - del_ospf_action = ospf_data.setdefault("delete", False) - if del_ospf_action: - config_data = ["no router {}".format(ospf)] - result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config - ) - return result + if ospf not in input_dict[router]: + return config_data - config_data = [] - cmd = "router {}".format(ospf) + logger.debug("Entering lib API: __create_ospf_global()") + ospf_data = input_dict[router][ospf] + del_ospf_action = ospf_data.setdefault("delete", False) + if del_ospf_action: + config_data = ["no router {}".format(ospf)] + return config_data + + cmd = "router {}".format(ospf) + + config_data.append(cmd) + + # router id + router_id = ospf_data.setdefault("router_id", None) + del_router_id = ospf_data.setdefault("del_router_id", False) + if del_router_id: + config_data.append("no {} router-id".format(ospf)) + if router_id: + config_data.append("{} router-id {}".format(ospf, router_id)) + + # log-adjacency-changes + log_adj_changes = ospf_data.setdefault("log_adj_changes", None) + del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False) + if del_log_adj_changes: + config_data.append("no log-adjacency-changes detail") + if log_adj_changes: + config_data.append("log-adjacency-changes {}".format(log_adj_changes)) + + # aggregation timer + aggr_timer = ospf_data.setdefault("aggr_timer", None) + del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False) + if del_aggr_timer: + config_data.append("no aggregation timer") + if aggr_timer: + config_data.append("aggregation timer {}".format(aggr_timer)) + + # maximum path information + ecmp_data = ospf_data.setdefault("maximum-paths", {}) + if ecmp_data: + cmd = "maximum-paths {}".format(ecmp_data) + del_action = ospf_data.setdefault("del_max_path", False) + if del_action: + cmd = "no maximum-paths" config_data.append(cmd) - # router id - router_id = ospf_data.setdefault("router_id", None) - del_router_id = ospf_data.setdefault("del_router_id", False) - if del_router_id: - config_data.append("no {} router-id".format(ospf)) - if router_id: - config_data.append("{} router-id {}".format(ospf, router_id)) - - # log-adjacency-changes - log_adj_changes = ospf_data.setdefault("log_adj_changes", None) - del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False) - if del_log_adj_changes: - config_data.append("no log-adjacency-changes detail") - if log_adj_changes: - config_data.append("log-adjacency-changes {}".format(log_adj_changes)) - - # aggregation timer - aggr_timer = ospf_data.setdefault("aggr_timer", None) - del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False) - if del_aggr_timer: - config_data.append("no aggregation timer") - if aggr_timer: - config_data.append("aggregation timer {}".format(aggr_timer)) - - # maximum path information - ecmp_data = ospf_data.setdefault("maximum-paths", {}) - if ecmp_data: - cmd = "maximum-paths {}".format(ecmp_data) - del_action = ospf_data.setdefault("del_max_path", False) - if del_action: - cmd = "no maximum-paths" - config_data.append(cmd) + # redistribute command + redistribute_data = ospf_data.setdefault("redistribute", {}) + if redistribute_data: + for redistribute in redistribute_data: + if "redist_type" not in redistribute: + logger.debug( + "Router %s: 'redist_type' not present in " "input_dict", router + ) + else: + cmd = "redistribute {}".format(redistribute["redist_type"]) + for red_type in redistribute_data: + if "route_map" in red_type: + cmd = cmd + " route-map {}".format(red_type["route_map"]) + del_action = redistribute.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - # redistribute command - redistribute_data = ospf_data.setdefault("redistribute", {}) - if redistribute_data: - for redistribute in redistribute_data: - if "redist_type" not in redistribute: - logger.debug( - "Router %s: 'redist_type' not present in " "input_dict", router - ) - else: - cmd = "redistribute {}".format(redistribute["redist_type"]) - for red_type in redistribute_data: - if "route_map" in red_type: - cmd = cmd + " route-map {}".format(red_type["route_map"]) - del_action = redistribute.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # area information + area_data = ospf_data.setdefault("area", {}) + if area_data: + for area in area_data: + if "id" not in area: + logger.debug( + "Router %s: 'area id' not present in " "input_dict", router + ) + else: + cmd = "area {}".format(area["id"]) - # area information - area_data = ospf_data.setdefault("area", {}) - if area_data: - for area in area_data: - if "id" not in area: - logger.debug( - "Router %s: 'area id' not present in " "input_dict", router - ) - else: - cmd = "area {}".format(area["id"]) + if "type" in area: + cmd = cmd + " {}".format(area["type"]) - if "type" in area: - cmd = cmd + " {}".format(area["type"]) + del_action = area.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = area.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # def route information + def_rte_data = ospf_data.setdefault("default-information", {}) + if def_rte_data: + if "originate" not in def_rte_data: + logger.debug( + "Router %s: 'originate key' not present in " "input_dict", router + ) + else: + cmd = "default-information originate" - # def route information - def_rte_data = ospf_data.setdefault("default-information", {}) - if def_rte_data: - if "originate" not in def_rte_data: - logger.debug( - "Router %s: 'originate key' not present in " "input_dict", router - ) - else: - cmd = "default-information originate" + if "always" in def_rte_data: + cmd = cmd + " always" - if "always" in def_rte_data: - cmd = cmd + " always" + if "metric" in def_rte_data: + cmd = cmd + " metric {}".format(def_rte_data["metric"]) - if "metric" in def_rte_data: - cmd = cmd + " metric {}".format(def_rte_data["metric"]) + if "metric-type" in def_rte_data: + cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"]) - if "metric-type" in def_rte_data: - cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"]) + if "route-map" in def_rte_data: + cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) - if "route-map" in def_rte_data: - cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) + del_action = def_rte_data.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = def_rte_data.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # area interface information for ospf6d only + if ospf == "ospf6": + area_iface = ospf_data.setdefault("neighbors", {}) + if area_iface: + for neighbor in area_iface: + if "area" in area_iface[neighbor]: + iface = input_dict[router]["links"][neighbor]["interface"] + cmd = "interface {} area {}".format( + iface, area_iface[neighbor]["area"] + ) + if area_iface[neighbor].setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - # area interface information for ospf6d only - if ospf == "ospf6": - area_iface = ospf_data.setdefault("neighbors", {}) - if area_iface: - for neighbor in area_iface: - if "area" in area_iface[neighbor]: + try: + if "area" in input_dict[router]["links"][neighbor]["ospf6"]: iface = input_dict[router]["links"][neighbor]["interface"] cmd = "interface {} area {}".format( - iface, area_iface[neighbor]["area"] + iface, + input_dict[router]["links"][neighbor]["ospf6"]["area"], ) - if area_iface[neighbor].setdefault("delete", False): + if input_dict[router]["links"][neighbor].setdefault( + "delete", False + ): cmd = "no {}".format(cmd) config_data.append(cmd) + except KeyError: + pass - try: - if "area" in input_dict[router]["links"][neighbor]["ospf6"]: - iface = input_dict[router]["links"][neighbor]["interface"] - cmd = "interface {} area {}".format( - iface, - input_dict[router]["links"][neighbor]["ospf6"]["area"], - ) - if input_dict[router]["links"][neighbor].setdefault( - "delete", False - ): - cmd = "no {}".format(cmd) - config_data.append(cmd) - except KeyError: - pass + # summary information + summary_data = ospf_data.setdefault("summary-address", {}) + if summary_data: + for summary in summary_data: + if "prefix" not in summary: + logger.debug( + "Router %s: 'summary-address' not present in " "input_dict", + router, + ) + else: + cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) - # summary information - summary_data = ospf_data.setdefault("summary-address", {}) - if summary_data: - for summary in summary_data: - if "prefix" not in summary: - logger.debug( - "Router %s: 'summary-address' not present in " "input_dict", - router, - ) - else: - cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) + _tag = summary.setdefault("tag", None) + if _tag: + cmd = "{} tag {}".format(cmd, _tag) - _tag = summary.setdefault("tag", None) - if _tag: - cmd = "{} tag {}".format(cmd, _tag) + _advertise = summary.setdefault("advertise", True) + if not _advertise: + cmd = "{} no-advertise".format(cmd) - _advertise = summary.setdefault("advertise", True) - if not _advertise: - cmd = "{} no-advertise".format(cmd) + del_action = summary.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = summary.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + # ospf gr information + gr_data = ospf_data.setdefault("graceful-restart", {}) + if gr_data: - result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config - ) + if "opaque" in gr_data and gr_data["opaque"]: + cmd = "capability opaque" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - except InvalidCLIError: - # Traceback - errormsg = traceback.format_exc() - logger.error(errormsg) - return errormsg + if "helper enable" in gr_data and not gr_data["helper enable"]: + cmd = "graceful-restart helper enable" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list: + for rtrs in gr_data["helper enable"]: + cmd = "graceful-restart helper enable {}".format(rtrs) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "helper" in gr_data: + if type(gr_data["helper"]) is not list: + gr_data["helper"] = list(gr_data["helper"]) + for helper_role in gr_data["helper"]: + cmd = "graceful-restart helper {}".format(helper_role) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "supported-grace-time" in gr_data: + cmd = "graceful-restart helper supported-grace-time {}".format( + gr_data["supported-grace-time"] + ) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) logger.debug("Exiting lib API: create_ospf_global()") - return result + + return config_data -def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True): +def create_router_ospf6( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router @@ -365,25 +392,43 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr logger.debug("Entering lib API: create_router_ospf6()") result = False + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: topo = topo["routers"] input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): if "ospf6" not in input_dict[router]: logger.debug("Router %s: 'ospf6' not present in input_dict", router) continue - result = __create_ospf_global( + config_data = __create_ospf_global( tgen, input_dict, router, build, load_config, "ospf6" ) + if config_data: + config_data_dict[router] = config_data + + try: + result = create_common_configurations( + tgen, config_data_dict, "ospf6", build, load_config + ) + except InvalidCLIError: + logger.error("create_router_ospf6", exc_info=True) + result = False logger.debug("Exiting lib API: create_router_ospf6()") return result -def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True): +def config_ospf_interface( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router. @@ -418,10 +463,17 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= """ logger.debug("Enter lib config_ospf_interface") result = False + + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): config_data = [] for lnk in input_dict[router]["links"].keys(): @@ -506,10 +558,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= if build: return config_data - else: - result = create_common_configuration( - tgen, router, config_data, "interface_config", build=build - ) + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "interface_config", build=build + ) + logger.debug("Exiting lib API: config_ospf_interface()") return result @@ -577,7 +633,9 @@ def redistribute_ospf(tgen, topo, dut, route_type, **kwargs): # Verification procs ################################ @retry(retry_timeout=80) -def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expected=True): +def verify_ospf_neighbor( + tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True +): """ This API is to verify ospf neighborship by running show ip ospf neighbour command, @@ -625,6 +683,9 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec """ logger.debug("Entering lib API: verify_ospf_neighbor()") result = False + if topo is None: + topo = tgen.json_topo + if input_dict: for router, rnode in tgen.routers().items(): if "ospf" not in topo["routers"][router]: @@ -772,7 +833,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec # Verification procs ################################ @retry(retry_timeout=50) -def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): +def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False): """ This API is to verify ospf neighborship by running show ipv6 ospf neighbour command, @@ -820,6 +881,9 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False + if topo is None: + topo = tgen.json_topo + if input_dict: for router, rnode in tgen.routers().items(): if "ospf6" not in topo["routers"][router]: @@ -1078,7 +1142,7 @@ def verify_ospf_rib( nh_found = False for st_rt in ip_list: - st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt))) + st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) _addr_type = validate_ip_address(st_rt) if _addr_type != "ipv4": @@ -1263,7 +1327,9 @@ def verify_ospf_rib( @retry(retry_timeout=20) -def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expected=True): +def verify_ospf_interface( + tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True +): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1305,6 +1371,9 @@ def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expe logger.debug("Entering lib API: verify_ospf_interface()") result = False + if topo is None: + topo = tgen.json_topo + for router, rnode in tgen.routers().items(): if "ospf" not in topo["routers"][router]: continue @@ -1556,21 +1625,21 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): rnode = tgen.routers()[dut] if ospf: - if 'ospf6' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format( - router) + if "ospf6" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router) return errormsg - show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ipv6 ospf summary detail json", isjson=True + ) else: - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - router) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router) return errormsg - show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf summary detail json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -1581,23 +1650,35 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): ospf_summary_data = input_dict if ospf: - show_ospf_json = show_ospf_json['default'] + show_ospf_json = show_ospf_json["default"] for ospf_summ, summ_data in ospf_summary_data.items(): if ospf_summ not in show_ospf_json: continue - summary = ospf_summary_data[ospf_summ]['Summary address'] + summary = ospf_summary_data[ospf_summ]["Summary address"] if summary in show_ospf_json: for summ in summ_data: if summ_data[summ] == show_ospf_json[summary][summ]: - logger.info("[DUT: %s] OSPF summary %s:%s is %s", - router, summary, summ, summ_data[summ]) + logger.info( + "[DUT: %s] OSPF summary %s:%s is %s", + router, + summary, + summ, + summ_data[summ], + ) result = True else: - errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, " - "Expected is {}".format(router, summary, summ,show_ospf_json[ - summary][summ], summ_data[summ] )) + errormsg = ( + "[DUT: {}] OSPF summary {} : {} is {}, " + "Expected is {}".format( + router, + summary, + summ, + show_ospf_json[summary][summ], + summ_data[summ], + ) + ) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1605,8 +1686,9 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): @retry(retry_timeout=30) -def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, - tag=None, metric=None, fib=None): +def verify_ospf6_rib( + tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None +): """ This API is to verify ospf routes by running show ip ospf route command. @@ -1648,7 +1730,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, additional_nexthops_in_required_nhs = [] found_hops = [] for routerInput in input_dict.keys(): - for router, rnode in router_list.iteritems(): + for router, rnode in router_list.items(): if router != dut: continue @@ -1881,7 +1963,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, @retry(retry_timeout=6) -def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None): +def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1923,7 +2005,10 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False - for router, rnode in tgen.routers().iteritems(): + if topo is None: + topo = tgen.json_topo + + for router, rnode in tgen.routers().items(): if "ospf6" not in topo["routers"][router]: continue @@ -2260,7 +2345,9 @@ def verify_ospf6_database(tgen, topo, dut, input_dict): return result -def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config=True): +def config_ospf6_interface( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router. @@ -2295,17 +2382,26 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config """ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): config_data = [] - for lnk in input_dict[router]['links'].keys(): - if "ospf6" not in input_dict[router]['links'][lnk]: - logger.debug("Router %s: ospf6 config is not present in" - "input_dict, passed input_dict %s", router, - str(input_dict)) + for lnk in input_dict[router]["links"].keys(): + if "ospf6" not in input_dict[router]["links"][lnk]: + logger.debug( + "Router %s: ospf6 config is not present in" + "input_dict, passed input_dict %s", + router, + str(input_dict), + ) continue ospf_data = input_dict[router]["links"][lnk]["ospf6"] data_ospf_area = ospf_data.setdefault("area", None) @@ -2369,9 +2465,83 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config if build: return config_data + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "interface_config", build=build + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + + +@retry(retry_timeout=20) +def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): + """ + This API is used to vreify gr helper using command + show ip ospf graceful-restart helper + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : topology descriptions + * 'dut' : router + * 'input_dict' - values to be verified + + Usage: + ------- + input_dict = { + "helperSupport":"Disabled", + "strictLsaCheck":"Enabled", + "restartSupoort":"Planned and Unplanned Restarts", + "supportedGracePeriod":1800 + } + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + + """ + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + result = False + + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut) + return errormsg + + rnode = tgen.routers()[dut] + logger.info("Verifying OSPF GR details on router %s:", dut) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf graceful-restart helper json", isjson=True + ) + + # Verifying output dictionary show_ospf_json is empty or not + if not bool(show_ospf_json): + errormsg = "OSPF is not running" + raise ValueError(errormsg) + return errormsg + + for ospf_gr, gr_data in input_dict.items(): + try: + if input_dict[ospf_gr] == show_ospf_json[ospf_gr]: + logger.info( + "[DUT: FRR] OSPF GR Helper: %s is %s", + ospf_gr, + show_ospf_json[ospf_gr], + ) + result = True else: - result = create_common_configuration( - tgen, router, config_data, "interface_config", build=build + errormsg = ( + "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " + "is {}".format( + ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr] + ) ) + raise ValueError(errormsg) + return errormsg + + except KeyError: + errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr) + return errormsg + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result |
