diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 219 |
1 files changed, 132 insertions, 87 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index beac768905..c425e121af 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -18,37 +18,28 @@ # OF THIS SOFTWARE. # -import ipaddr import ipaddress import sys - from copy import deepcopy -from time import sleep -from lib.topolog import logger -from lib.topotest import frr_unicode -from ipaddress import IPv6Address -import sys # Import common_config to use commomnly used APIs from lib.common_config import ( create_common_configurations, InvalidCLIError, - retry, generate_ips, - check_address_types, - validate_ip_address, + retry, run_frr_cmd, + validate_ip_address, ) - -LOGDIR = "/tmp/topotests/" -TMPDIR = None +from lib.topolog import logger +from lib.topotest import frr_unicode ################################ # Configure procs ################################ -def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True): +def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. @@ -79,6 +70,9 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru logger.debug("Entering lib API: create_router_ospf()") result = False + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: @@ -113,9 +107,7 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru return result -def __create_ospf_global( - tgen, input_dict, router, build, load_config, ospf -): +def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf): """ Helper API to create ospf global configuration. @@ -190,8 +182,7 @@ def __create_ospf_global( if del_log_adj_changes: config_data.append("no log-adjacency-changes detail") if log_adj_changes: - config_data.append("log-adjacency-changes {}".format( - log_adj_changes)) + config_data.append("log-adjacency-changes {}".format(log_adj_changes)) # aggregation timer aggr_timer = ospf_data.setdefault("aggr_timer", None) @@ -199,8 +190,7 @@ def __create_ospf_global( if del_aggr_timer: config_data.append("no aggregation timer") if aggr_timer: - config_data.append("aggregation timer {}".format( - aggr_timer)) + config_data.append("aggregation timer {}".format(aggr_timer)) # maximum path information ecmp_data = ospf_data.setdefault("maximum-paths", {}) @@ -248,12 +238,13 @@ def __create_ospf_global( cmd = "no {}".format(cmd) config_data.append(cmd) - #def route information + # def route information def_rte_data = ospf_data.setdefault("default-information", {}) if def_rte_data: if "originate" not in def_rte_data: - logger.debug("Router %s: 'originate key' not present in " - "input_dict", router) + logger.debug( + "Router %s: 'originate key' not present in " "input_dict", router + ) else: cmd = "default-information originate" @@ -264,8 +255,7 @@ def __create_ospf_global( cmd = cmd + " metric {}".format(def_rte_data["metric"]) if "metric-type" in def_rte_data: - cmd = cmd + " metric-type {}".format(def_rte_data[ - "metric-type"]) + cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"]) if "route-map" in def_rte_data: cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) @@ -290,19 +280,19 @@ def __create_ospf_global( config_data.append(cmd) try: - if "area" in input_dict[router]['links'][neighbor][ - 'ospf6']: + if "area" in input_dict[router]["links"][neighbor]["ospf6"]: iface = input_dict[router]["links"][neighbor]["interface"] cmd = "interface {} area {}".format( - iface, input_dict[router]['links'][neighbor][ - 'ospf6']['area']) - if input_dict[router]['links'][neighbor].setdefault( - "delete", False): + iface, + input_dict[router]["links"][neighbor]["ospf6"]["area"], + ) + if input_dict[router]["links"][neighbor].setdefault( + "delete", False + ): cmd = "no {}".format(cmd) config_data.append(cmd) except KeyError: - pass - + pass # summary information summary_data = ospf_data.setdefault("summary-address", {}) @@ -339,14 +329,14 @@ def __create_ospf_global( cmd = "no {}".format(cmd) config_data.append(cmd) - if "helper-only" in gr_data and not gr_data["helper-only"]: - cmd = "graceful-restart helper-only" + if "helper enable" in gr_data and not gr_data["helper enable"]: + cmd = "graceful-restart helper enable" if gr_data.setdefault("delete", False): cmd = "no {}".format(cmd) config_data.append(cmd) - elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list: - for rtrs in gr_data["helper-only"]: - cmd = "graceful-restart helper-only {}".format(rtrs) + elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list: + for rtrs in gr_data["helper enable"]: + cmd = "graceful-restart helper enable {}".format(rtrs) if gr_data.setdefault("delete", False): cmd = "no {}".format(cmd) config_data.append(cmd) @@ -373,7 +363,9 @@ def __create_ospf_global( return config_data -def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True): +def create_router_ospf6( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router @@ -400,6 +392,9 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr logger.debug("Entering lib API: create_router_ospf6()") result = False + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: @@ -431,7 +426,9 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr return result -def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True): +def config_ospf_interface( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router. @@ -466,6 +463,10 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= """ logger.debug("Enter lib config_ospf_interface") result = False + + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: @@ -632,7 +633,9 @@ def redistribute_ospf(tgen, topo, dut, route_type, **kwargs): # Verification procs ################################ @retry(retry_timeout=80) -def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expected=True): +def verify_ospf_neighbor( + tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True +): """ This API is to verify ospf neighborship by running show ip ospf neighbour command, @@ -680,6 +683,9 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec """ logger.debug("Entering lib API: verify_ospf_neighbor()") result = False + if topo is None: + topo = tgen.json_topo + if input_dict: for router, rnode in tgen.routers().items(): if "ospf" not in topo["routers"][router]: @@ -827,7 +833,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec # Verification procs ################################ @retry(retry_timeout=50) -def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): +def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False): """ This API is to verify ospf neighborship by running show ipv6 ospf neighbour command, @@ -875,6 +881,9 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False + if topo is None: + topo = tgen.json_topo + if input_dict: for router, rnode in tgen.routers().items(): if "ospf6" not in topo["routers"][router]: @@ -1133,7 +1142,7 @@ def verify_ospf_rib( nh_found = False for st_rt in ip_list: - st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt))) + st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) _addr_type = validate_ip_address(st_rt) if _addr_type != "ipv4": @@ -1318,7 +1327,9 @@ def verify_ospf_rib( @retry(retry_timeout=20) -def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expected=True): +def verify_ospf_interface( + tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True +): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1360,6 +1371,9 @@ def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expe logger.debug("Entering lib API: verify_ospf_interface()") result = False + if topo is None: + topo = tgen.json_topo + for router, rnode in tgen.routers().items(): if "ospf" not in topo["routers"][router]: continue @@ -1611,21 +1625,21 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): rnode = tgen.routers()[dut] if ospf: - if 'ospf6' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format( - router) + if "ospf6" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router) return errormsg - show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ipv6 ospf summary detail json", isjson=True + ) else: - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - router) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router) return errormsg - show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf summary detail json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -1636,23 +1650,35 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): ospf_summary_data = input_dict if ospf: - show_ospf_json = show_ospf_json['default'] + show_ospf_json = show_ospf_json["default"] for ospf_summ, summ_data in ospf_summary_data.items(): if ospf_summ not in show_ospf_json: continue - summary = ospf_summary_data[ospf_summ]['Summary address'] + summary = ospf_summary_data[ospf_summ]["Summary address"] if summary in show_ospf_json: for summ in summ_data: if summ_data[summ] == show_ospf_json[summary][summ]: - logger.info("[DUT: %s] OSPF summary %s:%s is %s", - router, summary, summ, summ_data[summ]) + logger.info( + "[DUT: %s] OSPF summary %s:%s is %s", + router, + summary, + summ, + summ_data[summ], + ) result = True else: - errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, " - "Expected is {}".format(router, summary, summ,show_ospf_json[ - summary][summ], summ_data[summ] )) + errormsg = ( + "[DUT: {}] OSPF summary {} : {} is {}, " + "Expected is {}".format( + router, + summary, + summ, + show_ospf_json[summary][summ], + summ_data[summ], + ) + ) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1660,8 +1686,9 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): @retry(retry_timeout=30) -def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, - tag=None, metric=None, fib=None): +def verify_ospf6_rib( + tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None +): """ This API is to verify ospf routes by running show ip ospf route command. @@ -1703,7 +1730,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, additional_nexthops_in_required_nhs = [] found_hops = [] for routerInput in input_dict.keys(): - for router, rnode in router_list.iteritems(): + for router, rnode in router_list.items(): if router != dut: continue @@ -1936,7 +1963,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, @retry(retry_timeout=6) -def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None): +def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1978,7 +2005,10 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False - for router, rnode in tgen.routers().iteritems(): + if topo is None: + topo = tgen.json_topo + + for router, rnode in tgen.routers().items(): if "ospf6" not in topo["routers"][router]: continue @@ -2315,7 +2345,9 @@ def verify_ospf6_database(tgen, topo, dut, input_dict): return result -def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config=True): +def config_ospf6_interface( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router. @@ -2350,6 +2382,9 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config """ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False + if topo is None: + topo = tgen.json_topo + if not input_dict: input_dict = deepcopy(topo) else: @@ -2359,11 +2394,14 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config for router in input_dict.keys(): config_data = [] - for lnk in input_dict[router]['links'].keys(): - if "ospf6" not in input_dict[router]['links'][lnk]: - logger.debug("Router %s: ospf6 config is not present in" - "input_dict, passed input_dict %s", router, - str(input_dict)) + for lnk in input_dict[router]["links"].keys(): + if "ospf6" not in input_dict[router]["links"][lnk]: + logger.debug( + "Router %s: ospf6 config is not present in" + "input_dict, passed input_dict %s", + router, + str(input_dict), + ) continue ospf_data = input_dict[router]["links"][lnk]["ospf6"] data_ospf_area = ospf_data.setdefault("area", None) @@ -2438,6 +2476,7 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result + @retry(retry_timeout=20) def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): """ @@ -2465,37 +2504,43 @@ def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - dut) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut) return errormsg rnode = tgen.routers()[dut] logger.info("Verifying OSPF GR details on router %s:", dut) - show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf graceful-restart helper json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" - raise ValueError (errormsg) + raise ValueError(errormsg) return errormsg - for ospf_gr, gr_data in input_dict.items(): + for ospf_gr, gr_data in input_dict.items(): try: if input_dict[ospf_gr] == show_ospf_json[ospf_gr]: - logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr, - show_ospf_json[ospf_gr]) + logger.info( + "[DUT: FRR] OSPF GR Helper: %s is %s", + ospf_gr, + show_ospf_json[ospf_gr], + ) result = True else: - errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " - "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[ - ospf_gr])) - raise ValueError (errormsg) + errormsg = ( + "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " + "is {}".format( + ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr] + ) + ) + raise ValueError(errormsg) return errormsg except KeyError: - errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)) + errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) |
