summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py219
1 files changed, 132 insertions, 87 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index beac768905..c425e121af 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -18,37 +18,28 @@
# OF THIS SOFTWARE.
#
-import ipaddr
import ipaddress
import sys
-
from copy import deepcopy
-from time import sleep
-from lib.topolog import logger
-from lib.topotest import frr_unicode
-from ipaddress import IPv6Address
-import sys
# Import common_config to use commomnly used APIs
from lib.common_config import (
create_common_configurations,
InvalidCLIError,
- retry,
generate_ips,
- check_address_types,
- validate_ip_address,
+ retry,
run_frr_cmd,
+ validate_ip_address,
)
-
-LOGDIR = "/tmp/topotests/"
-TMPDIR = None
+from lib.topolog import logger
+from lib.topotest import frr_unicode
################################
# Configure procs
################################
-def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True):
+def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
@@ -79,6 +70,9 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
logger.debug("Entering lib API: create_router_ospf()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
@@ -113,9 +107,7 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
return result
-def __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf
-):
+def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
"""
Helper API to create ospf global configuration.
@@ -190,8 +182,7 @@ def __create_ospf_global(
if del_log_adj_changes:
config_data.append("no log-adjacency-changes detail")
if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(
- log_adj_changes))
+ config_data.append("log-adjacency-changes {}".format(log_adj_changes))
# aggregation timer
aggr_timer = ospf_data.setdefault("aggr_timer", None)
@@ -199,8 +190,7 @@ def __create_ospf_global(
if del_aggr_timer:
config_data.append("no aggregation timer")
if aggr_timer:
- config_data.append("aggregation timer {}".format(
- aggr_timer))
+ config_data.append("aggregation timer {}".format(aggr_timer))
# maximum path information
ecmp_data = ospf_data.setdefault("maximum-paths", {})
@@ -248,12 +238,13 @@ def __create_ospf_global(
cmd = "no {}".format(cmd)
config_data.append(cmd)
- #def route information
+ # def route information
def_rte_data = ospf_data.setdefault("default-information", {})
if def_rte_data:
if "originate" not in def_rte_data:
- logger.debug("Router %s: 'originate key' not present in "
- "input_dict", router)
+ logger.debug(
+ "Router %s: 'originate key' not present in " "input_dict", router
+ )
else:
cmd = "default-information originate"
@@ -264,8 +255,7 @@ def __create_ospf_global(
cmd = cmd + " metric {}".format(def_rte_data["metric"])
if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data[
- "metric-type"])
+ cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
if "route-map" in def_rte_data:
cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
@@ -290,19 +280,19 @@ def __create_ospf_global(
config_data.append(cmd)
try:
- if "area" in input_dict[router]['links'][neighbor][
- 'ospf6']:
+ if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
iface = input_dict[router]["links"][neighbor]["interface"]
cmd = "interface {} area {}".format(
- iface, input_dict[router]['links'][neighbor][
- 'ospf6']['area'])
- if input_dict[router]['links'][neighbor].setdefault(
- "delete", False):
+ iface,
+ input_dict[router]["links"][neighbor]["ospf6"]["area"],
+ )
+ if input_dict[router]["links"][neighbor].setdefault(
+ "delete", False
+ ):
cmd = "no {}".format(cmd)
config_data.append(cmd)
except KeyError:
- pass
-
+ pass
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
@@ -339,14 +329,14 @@ def __create_ospf_global(
cmd = "no {}".format(cmd)
config_data.append(cmd)
- if "helper-only" in gr_data and not gr_data["helper-only"]:
- cmd = "graceful-restart helper-only"
+ if "helper enable" in gr_data and not gr_data["helper enable"]:
+ cmd = "graceful-restart helper enable"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
- for rtrs in gr_data["helper-only"]:
- cmd = "graceful-restart helper-only {}".format(rtrs)
+ elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list:
+ for rtrs in gr_data["helper enable"]:
+ cmd = "graceful-restart helper enable {}".format(rtrs)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
@@ -373,7 +363,9 @@ def __create_ospf_global(
return config_data
-def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
+def create_router_ospf6(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router
@@ -400,6 +392,9 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
logger.debug("Entering lib API: create_router_ospf6()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
@@ -431,7 +426,9 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
return result
-def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True):
+def config_ospf_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router.
@@ -466,6 +463,10 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
"""
logger.debug("Enter lib config_ospf_interface")
result = False
+
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
@@ -632,7 +633,9 @@ def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
# Verification procs
################################
@retry(retry_timeout=80)
-def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expected=True):
+def verify_ospf_neighbor(
+ tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
+):
"""
This API is to verify ospf neighborship by running
show ip ospf neighbour command,
@@ -680,6 +683,9 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec
"""
logger.debug("Entering lib API: verify_ospf_neighbor()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
@@ -827,7 +833,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec
# Verification procs
################################
@retry(retry_timeout=50)
-def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
+def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
"""
This API is to verify ospf neighborship by running
show ipv6 ospf neighbour command,
@@ -875,6 +881,9 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
@@ -1133,7 +1142,7 @@ def verify_ospf_rib(
nh_found = False
for st_rt in ip_list:
- st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt)))
+ st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != "ipv4":
@@ -1318,7 +1327,9 @@ def verify_ospf_rib(
@retry(retry_timeout=20)
-def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expected=True):
+def verify_ospf_interface(
+ tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
+):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1360,6 +1371,9 @@ def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expe
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
@@ -1611,21 +1625,21 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
rnode = tgen.routers()[dut]
if ospf:
- if 'ospf6' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(
- router)
+ if "ospf6" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
return errormsg
- show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf summary detail json", isjson=True
+ )
else:
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- router)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
return errormsg
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf summary detail json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -1636,23 +1650,35 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
ospf_summary_data = input_dict
if ospf:
- show_ospf_json = show_ospf_json['default']
+ show_ospf_json = show_ospf_json["default"]
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
- summary = ospf_summary_data[ospf_summ]['Summary address']
+ summary = ospf_summary_data[ospf_summ]["Summary address"]
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
- logger.info("[DUT: %s] OSPF summary %s:%s is %s",
- router, summary, summ, summ_data[summ])
+ logger.info(
+ "[DUT: %s] OSPF summary %s:%s is %s",
+ router,
+ summary,
+ summ,
+ summ_data[summ],
+ )
result = True
else:
- errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, "
- "Expected is {}".format(router, summary, summ,show_ospf_json[
- summary][summ], summ_data[summ] ))
+ errormsg = (
+ "[DUT: {}] OSPF summary {} : {} is {}, "
+ "Expected is {}".format(
+ router,
+ summary,
+ summ,
+ show_ospf_json[summary][summ],
+ summ_data[summ],
+ )
+ )
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1660,8 +1686,9 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
@retry(retry_timeout=30)
-def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
- tag=None, metric=None, fib=None):
+def verify_ospf6_rib(
+ tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
+):
"""
This API is to verify ospf routes by running
show ip ospf route command.
@@ -1703,7 +1730,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
- for router, rnode in router_list.iteritems():
+ for router, rnode in router_list.items():
if router != dut:
continue
@@ -1936,7 +1963,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
@retry(retry_timeout=6)
-def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None):
+def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1978,7 +2005,10 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
- for router, rnode in tgen.routers().iteritems():
+ if topo is None:
+ topo = tgen.json_topo
+
+ for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
@@ -2315,7 +2345,9 @@ def verify_ospf6_database(tgen, topo, dut, input_dict):
return result
-def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config=True):
+def config_ospf6_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router.
@@ -2350,6 +2382,9 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
@@ -2359,11 +2394,14 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
for router in input_dict.keys():
config_data = []
- for lnk in input_dict[router]['links'].keys():
- if "ospf6" not in input_dict[router]['links'][lnk]:
- logger.debug("Router %s: ospf6 config is not present in"
- "input_dict, passed input_dict %s", router,
- str(input_dict))
+ for lnk in input_dict[router]["links"].keys():
+ if "ospf6" not in input_dict[router]["links"][lnk]:
+ logger.debug(
+ "Router %s: ospf6 config is not present in"
+ "input_dict, passed input_dict %s",
+ router,
+ str(input_dict),
+ )
continue
ospf_data = input_dict[router]["links"][lnk]["ospf6"]
data_ospf_area = ospf_data.setdefault("area", None)
@@ -2438,6 +2476,7 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
+
@retry(retry_timeout=20)
def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
"""
@@ -2465,37 +2504,43 @@ def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- dut)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF GR details on router %s:", dut)
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf graceful-restart helper json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
- raise ValueError (errormsg)
+ raise ValueError(errormsg)
return errormsg
- for ospf_gr, gr_data in input_dict.items():
+ for ospf_gr, gr_data in input_dict.items():
try:
if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
- logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr,
- show_ospf_json[ospf_gr])
+ logger.info(
+ "[DUT: FRR] OSPF GR Helper: %s is %s",
+ ospf_gr,
+ show_ospf_json[ospf_gr],
+ )
result = True
else:
- errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
- "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[
- ospf_gr]))
- raise ValueError (errormsg)
+ errormsg = (
+ "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
+ "is {}".format(
+ ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
+ )
+ )
+ raise ValueError(errormsg)
return errormsg
except KeyError:
- errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr))
+ errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))