diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 489 |
1 files changed, 307 insertions, 182 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 40da7c8fbe..beac768905 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -18,7 +18,6 @@ # OF THIS SOFTWARE. # -import traceback import ipaddr import ipaddress import sys @@ -28,10 +27,11 @@ from time import sleep from lib.topolog import logger from lib.topotest import frr_unicode from ipaddress import IPv6Address +import sys # Import common_config to use commomnly used APIs from lib.common_config import ( - create_common_configuration, + create_common_configurations, InvalidCLIError, retry, generate_ips, @@ -85,32 +85,36 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru topo = topo["routers"] input_dict = deepcopy(input_dict) - for router in input_dict.keys(): - if "ospf" not in input_dict[router]: - logger.debug("Router %s: 'ospf' not present in input_dict", router) - continue + for ospf in ["ospf", "ospf6"]: + config_data_dict = {} - result = __create_ospf_global(tgen, input_dict, router, build, load_config) - if result is True: - ospf_data = input_dict[router]["ospf"] - - for router in input_dict.keys(): - if "ospf6" not in input_dict[router]: - logger.debug("Router %s: 'ospf6' not present in input_dict", router) - continue + for router in input_dict.keys(): + if ospf not in input_dict[router]: + logger.debug("Router %s: %s not present in input_dict", router, ospf) + continue - result = __create_ospf_global( - tgen, input_dict, router, build, load_config, ospf="ospf6" - ) - if result is True: - ospf_data = input_dict[router]["ospf6"] + config_data = __create_ospf_global( + tgen, input_dict, router, build, load_config, ospf + ) + if config_data: + if router not in config_data_dict: + config_data_dict[router] = config_data + else: + config_data_dict[router].extend(config_data) + try: + result = create_common_configurations( + tgen, config_data_dict, ospf, build, load_config + ) + except InvalidCLIError: + logger.error("create_router_ospf (ipv4)", exc_info=True) + result = False logger.debug("Exiting lib API: create_router_ospf()") return result def __create_ospf_global( - tgen, input_dict, router, build=False, load_config=True, ospf="ospf" + tgen, input_dict, router, build, load_config, ospf ): """ Helper API to create ospf global configuration. @@ -132,12 +136,12 @@ def __create_ospf_global( "links": { "r3": { "ipv6": "2013:13::1/64", - "ospf6": { + "ospf6": { "hello_interval": 1, "dead_interval": 4, "network": "point-to-point" } - } + } }, "ospf6": { "router_id": "1.1.1.1", @@ -152,190 +156,221 @@ def __create_ospf_global( Returns ------- - True or False + list of configuration commands """ - result = False - logger.debug("Entering lib API: __create_ospf_global()") - try: + config_data = [] - ospf_data = input_dict[router][ospf] - del_ospf_action = ospf_data.setdefault("delete", False) - if del_ospf_action: - config_data = ["no router {}".format(ospf)] - result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config - ) - return result + if ospf not in input_dict[router]: + return config_data - config_data = [] - cmd = "router {}".format(ospf) + logger.debug("Entering lib API: __create_ospf_global()") + ospf_data = input_dict[router][ospf] + del_ospf_action = ospf_data.setdefault("delete", False) + if del_ospf_action: + config_data = ["no router {}".format(ospf)] + return config_data + + cmd = "router {}".format(ospf) + + config_data.append(cmd) + + # router id + router_id = ospf_data.setdefault("router_id", None) + del_router_id = ospf_data.setdefault("del_router_id", False) + if del_router_id: + config_data.append("no {} router-id".format(ospf)) + if router_id: + config_data.append("{} router-id {}".format(ospf, router_id)) + + # log-adjacency-changes + log_adj_changes = ospf_data.setdefault("log_adj_changes", None) + del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False) + if del_log_adj_changes: + config_data.append("no log-adjacency-changes detail") + if log_adj_changes: + config_data.append("log-adjacency-changes {}".format( + log_adj_changes)) + + # aggregation timer + aggr_timer = ospf_data.setdefault("aggr_timer", None) + del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False) + if del_aggr_timer: + config_data.append("no aggregation timer") + if aggr_timer: + config_data.append("aggregation timer {}".format( + aggr_timer)) + + # maximum path information + ecmp_data = ospf_data.setdefault("maximum-paths", {}) + if ecmp_data: + cmd = "maximum-paths {}".format(ecmp_data) + del_action = ospf_data.setdefault("del_max_path", False) + if del_action: + cmd = "no maximum-paths" config_data.append(cmd) - # router id - router_id = ospf_data.setdefault("router_id", None) - del_router_id = ospf_data.setdefault("del_router_id", False) - if del_router_id: - config_data.append("no {} router-id".format(ospf)) - if router_id: - config_data.append("{} router-id {}".format(ospf, router_id)) - - # log-adjacency-changes - log_adj_changes = ospf_data.setdefault("log_adj_changes", None) - del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False) - if del_log_adj_changes: - config_data.append("no log-adjacency-changes detail") - if log_adj_changes: - config_data.append("log-adjacency-changes {}".format(log_adj_changes)) - - # aggregation timer - aggr_timer = ospf_data.setdefault("aggr_timer", None) - del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False) - if del_aggr_timer: - config_data.append("no aggregation timer") - if aggr_timer: - config_data.append("aggregation timer {}".format(aggr_timer)) - - # maximum path information - ecmp_data = ospf_data.setdefault("maximum-paths", {}) - if ecmp_data: - cmd = "maximum-paths {}".format(ecmp_data) - del_action = ospf_data.setdefault("del_max_path", False) + # redistribute command + redistribute_data = ospf_data.setdefault("redistribute", {}) + if redistribute_data: + for redistribute in redistribute_data: + if "redist_type" not in redistribute: + logger.debug( + "Router %s: 'redist_type' not present in " "input_dict", router + ) + else: + cmd = "redistribute {}".format(redistribute["redist_type"]) + for red_type in redistribute_data: + if "route_map" in red_type: + cmd = cmd + " route-map {}".format(red_type["route_map"]) + del_action = redistribute.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) + + # area information + area_data = ospf_data.setdefault("area", {}) + if area_data: + for area in area_data: + if "id" not in area: + logger.debug( + "Router %s: 'area id' not present in " "input_dict", router + ) + else: + cmd = "area {}".format(area["id"]) + + if "type" in area: + cmd = cmd + " {}".format(area["type"]) + + del_action = area.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) + + #def route information + def_rte_data = ospf_data.setdefault("default-information", {}) + if def_rte_data: + if "originate" not in def_rte_data: + logger.debug("Router %s: 'originate key' not present in " + "input_dict", router) + else: + cmd = "default-information originate" + + if "always" in def_rte_data: + cmd = cmd + " always" + + if "metric" in def_rte_data: + cmd = cmd + " metric {}".format(def_rte_data["metric"]) + + if "metric-type" in def_rte_data: + cmd = cmd + " metric-type {}".format(def_rte_data[ + "metric-type"]) + + if "route-map" in def_rte_data: + cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) + + del_action = def_rte_data.setdefault("delete", False) if del_action: - cmd = "no maximum-paths" + cmd = "no {}".format(cmd) config_data.append(cmd) - # redistribute command - redistribute_data = ospf_data.setdefault("redistribute", {}) - if redistribute_data: - for redistribute in redistribute_data: - if "redist_type" not in redistribute: - logger.debug( - "Router %s: 'redist_type' not present in " "input_dict", router + # area interface information for ospf6d only + if ospf == "ospf6": + area_iface = ospf_data.setdefault("neighbors", {}) + if area_iface: + for neighbor in area_iface: + if "area" in area_iface[neighbor]: + iface = input_dict[router]["links"][neighbor]["interface"] + cmd = "interface {} area {}".format( + iface, area_iface[neighbor]["area"] ) - else: - cmd = "redistribute {}".format(redistribute["redist_type"]) - for red_type in redistribute_data: - if "route_map" in red_type: - cmd = cmd + " route-map {}".format(red_type["route_map"]) - del_action = redistribute.setdefault("delete", False) - if del_action: + if area_iface[neighbor].setdefault("delete", False): cmd = "no {}".format(cmd) config_data.append(cmd) - # area information - area_data = ospf_data.setdefault("area", {}) - if area_data: - for area in area_data: - if "id" not in area: - logger.debug( - "Router %s: 'area id' not present in " "input_dict", router - ) - else: - cmd = "area {}".format(area["id"]) + try: + if "area" in input_dict[router]['links'][neighbor][ + 'ospf6']: + iface = input_dict[router]["links"][neighbor]["interface"] + cmd = "interface {} area {}".format( + iface, input_dict[router]['links'][neighbor][ + 'ospf6']['area']) + if input_dict[router]['links'][neighbor].setdefault( + "delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + except KeyError: + pass - if "type" in area: - cmd = cmd + " {}".format(area["type"]) - del_action = area.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) - - # def route information - def_rte_data = ospf_data.setdefault("default-information", {}) - if def_rte_data: - if "originate" not in def_rte_data: + # summary information + summary_data = ospf_data.setdefault("summary-address", {}) + if summary_data: + for summary in summary_data: + if "prefix" not in summary: logger.debug( - "Router %s: 'originate key' not present in " "input_dict", router + "Router %s: 'summary-address' not present in " "input_dict", + router, ) else: - cmd = "default-information originate" - - if "always" in def_rte_data: - cmd = cmd + " always" - - if "metric" in def_rte_data: - cmd = cmd + " metric {}".format(def_rte_data["metric"]) + cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) - if "metric-type" in def_rte_data: - cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"]) + _tag = summary.setdefault("tag", None) + if _tag: + cmd = "{} tag {}".format(cmd, _tag) - if "route-map" in def_rte_data: - cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) + _advertise = summary.setdefault("advertise", True) + if not _advertise: + cmd = "{} no-advertise".format(cmd) - del_action = def_rte_data.setdefault("delete", False) + del_action = summary.setdefault("delete", False) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) - # area interface information for ospf6d only - if ospf == "ospf6": - area_iface = ospf_data.setdefault("neighbors", {}) - if area_iface: - for neighbor in area_iface: - if "area" in area_iface[neighbor]: - iface = input_dict[router]["links"][neighbor]["interface"] - cmd = "interface {} area {}".format( - iface, area_iface[neighbor]["area"] - ) - if area_iface[neighbor].setdefault("delete", False): - cmd = "no {}".format(cmd) - config_data.append(cmd) - - try: - if "area" in input_dict[router]["links"][neighbor]["ospf6"]: - iface = input_dict[router]["links"][neighbor]["interface"] - cmd = "interface {} area {}".format( - iface, - input_dict[router]["links"][neighbor]["ospf6"]["area"], - ) - if input_dict[router]["links"][neighbor].setdefault( - "delete", False - ): - cmd = "no {}".format(cmd) - config_data.append(cmd) - except KeyError: - pass - - # summary information - summary_data = ospf_data.setdefault("summary-address", {}) - if summary_data: - for summary in summary_data: - if "prefix" not in summary: - logger.debug( - "Router %s: 'summary-address' not present in " "input_dict", - router, - ) - else: - cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) + # ospf gr information + gr_data = ospf_data.setdefault("graceful-restart", {}) + if gr_data: - _tag = summary.setdefault("tag", None) - if _tag: - cmd = "{} tag {}".format(cmd, _tag) - - _advertise = summary.setdefault("advertise", True) - if not _advertise: - cmd = "{} no-advertise".format(cmd) + if "opaque" in gr_data and gr_data["opaque"]: + cmd = "capability opaque" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - del_action = summary.setdefault("delete", False) - if del_action: - cmd = "no {}".format(cmd) - config_data.append(cmd) + if "helper-only" in gr_data and not gr_data["helper-only"]: + cmd = "graceful-restart helper-only" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list: + for rtrs in gr_data["helper-only"]: + cmd = "graceful-restart helper-only {}".format(rtrs) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config - ) + if "helper" in gr_data: + if type(gr_data["helper"]) is not list: + gr_data["helper"] = list(gr_data["helper"]) + for helper_role in gr_data["helper"]: + cmd = "graceful-restart helper {}".format(helper_role) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) - except InvalidCLIError: - # Traceback - errormsg = traceback.format_exc() - logger.error(errormsg) - return errormsg + if "supported-grace-time" in gr_data: + cmd = "graceful-restart helper supported-grace-time {}".format( + gr_data["supported-grace-time"] + ) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) logger.debug("Exiting lib API: create_ospf_global()") - return result + + return config_data def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True): @@ -370,14 +405,27 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr else: topo = topo["routers"] input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): if "ospf6" not in input_dict[router]: logger.debug("Router %s: 'ospf6' not present in input_dict", router) continue - result = __create_ospf_global( + config_data = __create_ospf_global( tgen, input_dict, router, build, load_config, "ospf6" ) + if config_data: + config_data_dict[router] = config_data + + try: + result = create_common_configurations( + tgen, config_data_dict, "ospf6", build, load_config + ) + except InvalidCLIError: + logger.error("create_router_ospf6", exc_info=True) + result = False logger.debug("Exiting lib API: create_router_ospf6()") return result @@ -422,6 +470,9 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): config_data = [] for lnk in input_dict[router]["links"].keys(): @@ -506,10 +557,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= if build: return config_data - else: - result = create_common_configuration( - tgen, router, config_data, "interface_config", build=build - ) + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "interface_config", build=build + ) + logger.debug("Exiting lib API: config_ospf_interface()") return result @@ -2299,6 +2354,9 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config input_dict = deepcopy(topo) else: input_dict = deepcopy(input_dict) + + config_data_dict = {} + for router in input_dict.keys(): config_data = [] for lnk in input_dict[router]['links'].keys(): @@ -2369,9 +2427,76 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config if build: return config_data + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "interface_config", build=build + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + +@retry(retry_timeout=20) +def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): + """ + This API is used to vreify gr helper using command + show ip ospf graceful-restart helper + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : topology descriptions + * 'dut' : router + * 'input_dict' - values to be verified + + Usage: + ------- + input_dict = { + "helperSupport":"Disabled", + "strictLsaCheck":"Enabled", + "restartSupoort":"Planned and Unplanned Restarts", + "supportedGracePeriod":1800 + } + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + + """ + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + result = False + + if 'ospf' not in topo['routers'][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format( + dut) + return errormsg + + rnode = tgen.routers()[dut] + logger.info("Verifying OSPF GR details on router %s:", dut) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json", + isjson=True) + + # Verifying output dictionary show_ospf_json is empty or not + if not bool(show_ospf_json): + errormsg = "OSPF is not running" + raise ValueError (errormsg) + return errormsg + + for ospf_gr, gr_data in input_dict.items(): + try: + if input_dict[ospf_gr] == show_ospf_json[ospf_gr]: + logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr, + show_ospf_json[ospf_gr]) + result = True else: - result = create_common_configuration( - tgen, router, config_data, "interface_config", build=build - ) + errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " + "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[ + ospf_gr])) + raise ValueError (errormsg) + return errormsg + + except KeyError: + errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)) + return errormsg + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result |
