summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py159
1 files changed, 94 insertions, 65 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 803909f357..c21cbf0dd8 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -107,9 +107,7 @@ def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_confi
return result
-def __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf
-):
+def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
"""
Helper API to create ospf global configuration.
@@ -184,8 +182,7 @@ def __create_ospf_global(
if del_log_adj_changes:
config_data.append("no log-adjacency-changes detail")
if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(
- log_adj_changes))
+ config_data.append("log-adjacency-changes {}".format(log_adj_changes))
# aggregation timer
aggr_timer = ospf_data.setdefault("aggr_timer", None)
@@ -193,8 +190,7 @@ def __create_ospf_global(
if del_aggr_timer:
config_data.append("no aggregation timer")
if aggr_timer:
- config_data.append("aggregation timer {}".format(
- aggr_timer))
+ config_data.append("aggregation timer {}".format(aggr_timer))
# maximum path information
ecmp_data = ospf_data.setdefault("maximum-paths", {})
@@ -242,12 +238,13 @@ def __create_ospf_global(
cmd = "no {}".format(cmd)
config_data.append(cmd)
- #def route information
+ # def route information
def_rte_data = ospf_data.setdefault("default-information", {})
if def_rte_data:
if "originate" not in def_rte_data:
- logger.debug("Router %s: 'originate key' not present in "
- "input_dict", router)
+ logger.debug(
+ "Router %s: 'originate key' not present in " "input_dict", router
+ )
else:
cmd = "default-information originate"
@@ -258,8 +255,7 @@ def __create_ospf_global(
cmd = cmd + " metric {}".format(def_rte_data["metric"])
if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data[
- "metric-type"])
+ cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
if "route-map" in def_rte_data:
cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
@@ -284,19 +280,19 @@ def __create_ospf_global(
config_data.append(cmd)
try:
- if "area" in input_dict[router]['links'][neighbor][
- 'ospf6']:
+ if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
iface = input_dict[router]["links"][neighbor]["interface"]
cmd = "interface {} area {}".format(
- iface, input_dict[router]['links'][neighbor][
- 'ospf6']['area'])
- if input_dict[router]['links'][neighbor].setdefault(
- "delete", False):
+ iface,
+ input_dict[router]["links"][neighbor]["ospf6"]["area"],
+ )
+ if input_dict[router]["links"][neighbor].setdefault(
+ "delete", False
+ ):
cmd = "no {}".format(cmd)
config_data.append(cmd)
except KeyError:
- pass
-
+ pass
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
@@ -367,7 +363,9 @@ def __create_ospf_global(
return config_data
-def create_router_ospf6(tgen, topo=None, input_dict=None, build=False, load_config=True):
+def create_router_ospf6(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router
@@ -428,7 +426,9 @@ def create_router_ospf6(tgen, topo=None, input_dict=None, build=False, load_conf
return result
-def config_ospf_interface(tgen, topo=None, input_dict=None, build=False, load_config=True):
+def config_ospf_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router.
@@ -633,7 +633,9 @@ def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
# Verification procs
################################
@retry(retry_timeout=80)
-def verify_ospf_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True):
+def verify_ospf_neighbor(
+ tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
+):
"""
This API is to verify ospf neighborship by running
show ip ospf neighbour command,
@@ -1325,7 +1327,9 @@ def verify_ospf_rib(
@retry(retry_timeout=20)
-def verify_ospf_interface(tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True):
+def verify_ospf_interface(
+ tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
+):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1621,21 +1625,21 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
rnode = tgen.routers()[dut]
if ospf:
- if 'ospf6' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(
- router)
+ if "ospf6" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
return errormsg
- show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf summary detail json", isjson=True
+ )
else:
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- router)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
return errormsg
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf summary detail json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -1646,23 +1650,35 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
ospf_summary_data = input_dict
if ospf:
- show_ospf_json = show_ospf_json['default']
+ show_ospf_json = show_ospf_json["default"]
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
- summary = ospf_summary_data[ospf_summ]['Summary address']
+ summary = ospf_summary_data[ospf_summ]["Summary address"]
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
- logger.info("[DUT: %s] OSPF summary %s:%s is %s",
- router, summary, summ, summ_data[summ])
+ logger.info(
+ "[DUT: %s] OSPF summary %s:%s is %s",
+ router,
+ summary,
+ summ,
+ summ_data[summ],
+ )
result = True
else:
- errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, "
- "Expected is {}".format(router, summary, summ,show_ospf_json[
- summary][summ], summ_data[summ] ))
+ errormsg = (
+ "[DUT: {}] OSPF summary {} : {} is {}, "
+ "Expected is {}".format(
+ router,
+ summary,
+ summ,
+ show_ospf_json[summary][summ],
+ summ_data[summ],
+ )
+ )
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1670,8 +1686,9 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
@retry(retry_timeout=30)
-def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
- tag=None, metric=None, fib=None):
+def verify_ospf6_rib(
+ tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
+):
"""
This API is to verify ospf routes by running
show ip ospf route command.
@@ -1946,7 +1963,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
@retry(retry_timeout=6)
-def verify_ospf6_interface(tgen, topo=None, dut=None,lan=False, input_dict=None):
+def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1992,7 +2009,7 @@ def verify_ospf6_interface(tgen, topo=None, dut=None,lan=False, input_dict=None)
topo = tgen.json_topo
for router, rnode in tgen.routers().items():
- if 'ospf6' not in topo['routers'][router]:
+ if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
@@ -2328,7 +2345,9 @@ def verify_ospf6_database(tgen, topo, dut, input_dict):
return result
-def config_ospf6_interface(tgen, topo=None, input_dict=None, build=False, load_config=True):
+def config_ospf6_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router.
@@ -2375,11 +2394,14 @@ def config_ospf6_interface(tgen, topo=None, input_dict=None, build=False, load_c
for router in input_dict.keys():
config_data = []
- for lnk in input_dict[router]['links'].keys():
- if "ospf6" not in input_dict[router]['links'][lnk]:
- logger.debug("Router %s: ospf6 config is not present in"
- "input_dict, passed input_dict %s", router,
- str(input_dict))
+ for lnk in input_dict[router]["links"].keys():
+ if "ospf6" not in input_dict[router]["links"][lnk]:
+ logger.debug(
+ "Router %s: ospf6 config is not present in"
+ "input_dict, passed input_dict %s",
+ router,
+ str(input_dict),
+ )
continue
ospf_data = input_dict[router]["links"][lnk]["ospf6"]
data_ospf_area = ospf_data.setdefault("area", None)
@@ -2454,6 +2476,7 @@ def config_ospf6_interface(tgen, topo=None, input_dict=None, build=False, load_c
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
+
@retry(retry_timeout=20)
def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
"""
@@ -2481,37 +2504,43 @@ def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- dut)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF GR details on router %s:", dut)
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf graceful-restart helper json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
- raise ValueError (errormsg)
+ raise ValueError(errormsg)
return errormsg
- for ospf_gr, gr_data in input_dict.items():
+ for ospf_gr, gr_data in input_dict.items():
try:
if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
- logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr,
- show_ospf_json[ospf_gr])
+ logger.info(
+ "[DUT: FRR] OSPF GR Helper: %s is %s",
+ ospf_gr,
+ show_ospf_json[ospf_gr],
+ )
result = True
else:
- errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
- "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[
- ospf_gr]))
- raise ValueError (errormsg)
+ errormsg = (
+ "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
+ "is {}".format(
+ ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
+ )
+ )
+ raise ValueError(errormsg)
return errormsg
except KeyError:
- errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr))
+ errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))