diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 159 |
1 files changed, 94 insertions, 65 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 803909f357..c21cbf0dd8 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -107,9 +107,7 @@ def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_confi return result -def __create_ospf_global( - tgen, input_dict, router, build, load_config, ospf -): +def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf): """ Helper API to create ospf global configuration. @@ -184,8 +182,7 @@ def __create_ospf_global( if del_log_adj_changes: config_data.append("no log-adjacency-changes detail") if log_adj_changes: - config_data.append("log-adjacency-changes {}".format( - log_adj_changes)) + config_data.append("log-adjacency-changes {}".format(log_adj_changes)) # aggregation timer aggr_timer = ospf_data.setdefault("aggr_timer", None) @@ -193,8 +190,7 @@ def __create_ospf_global( if del_aggr_timer: config_data.append("no aggregation timer") if aggr_timer: - config_data.append("aggregation timer {}".format( - aggr_timer)) + config_data.append("aggregation timer {}".format(aggr_timer)) # maximum path information ecmp_data = ospf_data.setdefault("maximum-paths", {}) @@ -242,12 +238,13 @@ def __create_ospf_global( cmd = "no {}".format(cmd) config_data.append(cmd) - #def route information + # def route information def_rte_data = ospf_data.setdefault("default-information", {}) if def_rte_data: if "originate" not in def_rte_data: - logger.debug("Router %s: 'originate key' not present in " - "input_dict", router) + logger.debug( + "Router %s: 'originate key' not present in " "input_dict", router + ) else: cmd = "default-information originate" @@ -258,8 +255,7 @@ def __create_ospf_global( cmd = cmd + " metric {}".format(def_rte_data["metric"]) if "metric-type" in def_rte_data: - cmd = cmd + " metric-type {}".format(def_rte_data[ - "metric-type"]) + cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"]) if "route-map" in def_rte_data: cmd = cmd + " route-map {}".format(def_rte_data["route-map"]) @@ -284,19 +280,19 @@ def __create_ospf_global( config_data.append(cmd) try: - if "area" in input_dict[router]['links'][neighbor][ - 'ospf6']: + if "area" in input_dict[router]["links"][neighbor]["ospf6"]: iface = input_dict[router]["links"][neighbor]["interface"] cmd = "interface {} area {}".format( - iface, input_dict[router]['links'][neighbor][ - 'ospf6']['area']) - if input_dict[router]['links'][neighbor].setdefault( - "delete", False): + iface, + input_dict[router]["links"][neighbor]["ospf6"]["area"], + ) + if input_dict[router]["links"][neighbor].setdefault( + "delete", False + ): cmd = "no {}".format(cmd) config_data.append(cmd) except KeyError: - pass - + pass # summary information summary_data = ospf_data.setdefault("summary-address", {}) @@ -367,7 +363,9 @@ def __create_ospf_global( return config_data -def create_router_ospf6(tgen, topo=None, input_dict=None, build=False, load_config=True): +def create_router_ospf6( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router @@ -428,7 +426,9 @@ def create_router_ospf6(tgen, topo=None, input_dict=None, build=False, load_conf return result -def config_ospf_interface(tgen, topo=None, input_dict=None, build=False, load_config=True): +def config_ospf_interface( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router. @@ -633,7 +633,9 @@ def redistribute_ospf(tgen, topo, dut, route_type, **kwargs): # Verification procs ################################ @retry(retry_timeout=80) -def verify_ospf_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True): +def verify_ospf_neighbor( + tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True +): """ This API is to verify ospf neighborship by running show ip ospf neighbour command, @@ -1325,7 +1327,9 @@ def verify_ospf_rib( @retry(retry_timeout=20) -def verify_ospf_interface(tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True): +def verify_ospf_interface( + tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True +): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1621,21 +1625,21 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): rnode = tgen.routers()[dut] if ospf: - if 'ospf6' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format( - router) + if "ospf6" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router) return errormsg - show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ipv6 ospf summary detail json", isjson=True + ) else: - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - router) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router) return errormsg - show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf summary detail json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -1646,23 +1650,35 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): ospf_summary_data = input_dict if ospf: - show_ospf_json = show_ospf_json['default'] + show_ospf_json = show_ospf_json["default"] for ospf_summ, summ_data in ospf_summary_data.items(): if ospf_summ not in show_ospf_json: continue - summary = ospf_summary_data[ospf_summ]['Summary address'] + summary = ospf_summary_data[ospf_summ]["Summary address"] if summary in show_ospf_json: for summ in summ_data: if summ_data[summ] == show_ospf_json[summary][summ]: - logger.info("[DUT: %s] OSPF summary %s:%s is %s", - router, summary, summ, summ_data[summ]) + logger.info( + "[DUT: %s] OSPF summary %s:%s is %s", + router, + summary, + summ, + summ_data[summ], + ) result = True else: - errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, " - "Expected is {}".format(router, summary, summ,show_ospf_json[ - summary][summ], summ_data[summ] )) + errormsg = ( + "[DUT: {}] OSPF summary {} : {} is {}, " + "Expected is {}".format( + router, + summary, + summ, + show_ospf_json[summary][summ], + summ_data[summ], + ) + ) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) @@ -1670,8 +1686,9 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True): @retry(retry_timeout=30) -def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, - tag=None, metric=None, fib=None): +def verify_ospf6_rib( + tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None +): """ This API is to verify ospf routes by running show ip ospf route command. @@ -1946,7 +1963,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None, @retry(retry_timeout=6) -def verify_ospf6_interface(tgen, topo=None, dut=None,lan=False, input_dict=None): +def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -1992,7 +2009,7 @@ def verify_ospf6_interface(tgen, topo=None, dut=None,lan=False, input_dict=None) topo = tgen.json_topo for router, rnode in tgen.routers().items(): - if 'ospf6' not in topo['routers'][router]: + if "ospf6" not in topo["routers"][router]: continue if dut is not None and dut != router: @@ -2328,7 +2345,9 @@ def verify_ospf6_database(tgen, topo, dut, input_dict): return result -def config_ospf6_interface(tgen, topo=None, input_dict=None, build=False, load_config=True): +def config_ospf6_interface( + tgen, topo=None, input_dict=None, build=False, load_config=True +): """ API to configure ospf on router. @@ -2375,11 +2394,14 @@ def config_ospf6_interface(tgen, topo=None, input_dict=None, build=False, load_c for router in input_dict.keys(): config_data = [] - for lnk in input_dict[router]['links'].keys(): - if "ospf6" not in input_dict[router]['links'][lnk]: - logger.debug("Router %s: ospf6 config is not present in" - "input_dict, passed input_dict %s", router, - str(input_dict)) + for lnk in input_dict[router]["links"].keys(): + if "ospf6" not in input_dict[router]["links"][lnk]: + logger.debug( + "Router %s: ospf6 config is not present in" + "input_dict, passed input_dict %s", + router, + str(input_dict), + ) continue ospf_data = input_dict[router]["links"][lnk]["ospf6"] data_ospf_area = ospf_data.setdefault("area", None) @@ -2454,6 +2476,7 @@ def config_ospf6_interface(tgen, topo=None, input_dict=None, build=False, load_c logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result + @retry(retry_timeout=20) def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): """ @@ -2481,37 +2504,43 @@ def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - dut) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut) return errormsg rnode = tgen.routers()[dut] logger.info("Verifying OSPF GR details on router %s:", dut) - show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json", - isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf graceful-restart helper json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" - raise ValueError (errormsg) + raise ValueError(errormsg) return errormsg - for ospf_gr, gr_data in input_dict.items(): + for ospf_gr, gr_data in input_dict.items(): try: if input_dict[ospf_gr] == show_ospf_json[ospf_gr]: - logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr, - show_ospf_json[ospf_gr]) + logger.info( + "[DUT: FRR] OSPF GR Helper: %s is %s", + ospf_gr, + show_ospf_json[ospf_gr], + ) result = True else: - errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " - "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[ - ospf_gr])) - raise ValueError (errormsg) + errormsg = ( + "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " + "is {}".format( + ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr] + ) + ) + raise ValueError(errormsg) return errormsg except KeyError: - errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)) + errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr) return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) |
