summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py630
1 files changed, 400 insertions, 230 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 40da7c8fbe..c425e121af 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -18,37 +18,28 @@
# OF THIS SOFTWARE.
#
-import traceback
-import ipaddr
import ipaddress
import sys
-
from copy import deepcopy
-from time import sleep
-from lib.topolog import logger
-from lib.topotest import frr_unicode
-from ipaddress import IPv6Address
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
- retry,
generate_ips,
- check_address_types,
- validate_ip_address,
+ retry,
run_frr_cmd,
+ validate_ip_address,
)
-
-LOGDIR = "/tmp/topotests/"
-TMPDIR = None
+from lib.topolog import logger
+from lib.topotest import frr_unicode
################################
# Configure procs
################################
-def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True):
+def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
@@ -79,39 +70,44 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
logger.debug("Entering lib API: create_router_ospf()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
- for router in input_dict.keys():
- if "ospf" not in input_dict[router]:
- logger.debug("Router %s: 'ospf' not present in input_dict", router)
- continue
-
- result = __create_ospf_global(tgen, input_dict, router, build, load_config)
- if result is True:
- ospf_data = input_dict[router]["ospf"]
+ for ospf in ["ospf", "ospf6"]:
+ config_data_dict = {}
- for router in input_dict.keys():
- if "ospf6" not in input_dict[router]:
- logger.debug("Router %s: 'ospf6' not present in input_dict", router)
- continue
+ for router in input_dict.keys():
+ if ospf not in input_dict[router]:
+ logger.debug("Router %s: %s not present in input_dict", router, ospf)
+ continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf="ospf6"
- )
- if result is True:
- ospf_data = input_dict[router]["ospf6"]
+ config_data = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, ospf
+ )
+ if config_data:
+ if router not in config_data_dict:
+ config_data_dict[router] = config_data
+ else:
+ config_data_dict[router].extend(config_data)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, ospf, build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf (ipv4)", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
-def __create_ospf_global(
- tgen, input_dict, router, build=False, load_config=True, ospf="ospf"
-):
+def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
"""
Helper API to create ospf global configuration.
@@ -132,12 +128,12 @@ def __create_ospf_global(
"links": {
"r3": {
"ipv6": "2013:13::1/64",
- "ospf6": {
+ "ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
- }
+ }
},
"ospf6": {
"router_id": "1.1.1.1",
@@ -152,193 +148,224 @@ def __create_ospf_global(
Returns
-------
- True or False
+ list of configuration commands
"""
- result = False
- logger.debug("Entering lib API: __create_ospf_global()")
- try:
+ config_data = []
- ospf_data = input_dict[router][ospf]
- del_ospf_action = ospf_data.setdefault("delete", False)
- if del_ospf_action:
- config_data = ["no router {}".format(ospf)]
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
- return result
+ if ospf not in input_dict[router]:
+ return config_data
- config_data = []
- cmd = "router {}".format(ospf)
+ logger.debug("Entering lib API: __create_ospf_global()")
+ ospf_data = input_dict[router][ospf]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router {}".format(ospf)]
+ return config_data
+
+ cmd = "router {}".format(ospf)
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no {} router-id".format(ospf))
+ if router_id:
+ config_data.append("{} router-id {}".format(ospf, router_id))
+
+ # log-adjacency-changes
+ log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
+ del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
+ if del_log_adj_changes:
+ config_data.append("no log-adjacency-changes detail")
+ if log_adj_changes:
+ config_data.append("log-adjacency-changes {}".format(log_adj_changes))
+
+ # aggregation timer
+ aggr_timer = ospf_data.setdefault("aggr_timer", None)
+ del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
+ if del_aggr_timer:
+ config_data.append("no aggregation timer")
+ if aggr_timer:
+ config_data.append("aggregation timer {}".format(aggr_timer))
+
+ # maximum path information
+ ecmp_data = ospf_data.setdefault("maximum-paths", {})
+ if ecmp_data:
+ cmd = "maximum-paths {}".format(ecmp_data)
+ del_action = ospf_data.setdefault("del_max_path", False)
+ if del_action:
+ cmd = "no maximum-paths"
config_data.append(cmd)
- # router id
- router_id = ospf_data.setdefault("router_id", None)
- del_router_id = ospf_data.setdefault("del_router_id", False)
- if del_router_id:
- config_data.append("no {} router-id".format(ospf))
- if router_id:
- config_data.append("{} router-id {}".format(ospf, router_id))
-
- # log-adjacency-changes
- log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
- del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
- if del_log_adj_changes:
- config_data.append("no log-adjacency-changes detail")
- if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(log_adj_changes))
-
- # aggregation timer
- aggr_timer = ospf_data.setdefault("aggr_timer", None)
- del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
- if del_aggr_timer:
- config_data.append("no aggregation timer")
- if aggr_timer:
- config_data.append("aggregation timer {}".format(aggr_timer))
-
- # maximum path information
- ecmp_data = ospf_data.setdefault("maximum-paths", {})
- if ecmp_data:
- cmd = "maximum-paths {}".format(ecmp_data)
- del_action = ospf_data.setdefault("del_max_path", False)
- if del_action:
- cmd = "no maximum-paths"
- config_data.append(cmd)
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
+ else:
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # redistribute command
- redistribute_data = ospf_data.setdefault("redistribute", {})
- if redistribute_data:
- for redistribute in redistribute_data:
- if "redist_type" not in redistribute:
- logger.debug(
- "Router %s: 'redist_type' not present in " "input_dict", router
- )
- else:
- cmd = "redistribute {}".format(redistribute["redist_type"])
- for red_type in redistribute_data:
- if "route_map" in red_type:
- cmd = cmd + " route-map {}".format(red_type["route_map"])
- del_action = redistribute.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
+ else:
+ cmd = "area {}".format(area["id"])
- # area information
- area_data = ospf_data.setdefault("area", {})
- if area_data:
- for area in area_data:
- if "id" not in area:
- logger.debug(
- "Router %s: 'area id' not present in " "input_dict", router
- )
- else:
- cmd = "area {}".format(area["id"])
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
- if "type" in area:
- cmd = cmd + " {}".format(area["type"])
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = area.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # def route information
+ def_rte_data = ospf_data.setdefault("default-information", {})
+ if def_rte_data:
+ if "originate" not in def_rte_data:
+ logger.debug(
+ "Router %s: 'originate key' not present in " "input_dict", router
+ )
+ else:
+ cmd = "default-information originate"
- # def route information
- def_rte_data = ospf_data.setdefault("default-information", {})
- if def_rte_data:
- if "originate" not in def_rte_data:
- logger.debug(
- "Router %s: 'originate key' not present in " "input_dict", router
- )
- else:
- cmd = "default-information originate"
+ if "always" in def_rte_data:
+ cmd = cmd + " always"
- if "always" in def_rte_data:
- cmd = cmd + " always"
+ if "metric" in def_rte_data:
+ cmd = cmd + " metric {}".format(def_rte_data["metric"])
- if "metric" in def_rte_data:
- cmd = cmd + " metric {}".format(def_rte_data["metric"])
+ if "metric-type" in def_rte_data:
+ cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
- if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
+ if "route-map" in def_rte_data:
+ cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
- if "route-map" in def_rte_data:
- cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
+ del_action = def_rte_data.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = def_rte_data.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area interface information for ospf6d only
+ if ospf == "ospf6":
+ area_iface = ospf_data.setdefault("neighbors", {})
+ if area_iface:
+ for neighbor in area_iface:
+ if "area" in area_iface[neighbor]:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, area_iface[neighbor]["area"]
+ )
+ if area_iface[neighbor].setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # area interface information for ospf6d only
- if ospf == "ospf6":
- area_iface = ospf_data.setdefault("neighbors", {})
- if area_iface:
- for neighbor in area_iface:
- if "area" in area_iface[neighbor]:
+ try:
+ if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
iface = input_dict[router]["links"][neighbor]["interface"]
cmd = "interface {} area {}".format(
- iface, area_iface[neighbor]["area"]
+ iface,
+ input_dict[router]["links"][neighbor]["ospf6"]["area"],
)
- if area_iface[neighbor].setdefault("delete", False):
+ if input_dict[router]["links"][neighbor].setdefault(
+ "delete", False
+ ):
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ except KeyError:
+ pass
- try:
- if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
- iface = input_dict[router]["links"][neighbor]["interface"]
- cmd = "interface {} area {}".format(
- iface,
- input_dict[router]["links"][neighbor]["ospf6"]["area"],
- )
- if input_dict[router]["links"][neighbor].setdefault(
- "delete", False
- ):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- except KeyError:
- pass
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
+ logger.debug(
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
+ )
+ else:
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
- # summary information
- summary_data = ospf_data.setdefault("summary-address", {})
- if summary_data:
- for summary in summary_data:
- if "prefix" not in summary:
- logger.debug(
- "Router %s: 'summary-address' not present in " "input_dict",
- router,
- )
- else:
- cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
- _tag = summary.setdefault("tag", None)
- if _tag:
- cmd = "{} tag {}".format(cmd, _tag)
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
- _advertise = summary.setdefault("advertise", True)
- if not _advertise:
- cmd = "{} no-advertise".format(cmd)
+ del_action = summary.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = summary.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # ospf gr information
+ gr_data = ospf_data.setdefault("graceful-restart", {})
+ if gr_data:
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
+ if "opaque" in gr_data and gr_data["opaque"]:
+ cmd = "capability opaque"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if "helper enable" in gr_data and not gr_data["helper enable"]:
+ cmd = "graceful-restart helper enable"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list:
+ for rtrs in gr_data["helper enable"]:
+ cmd = "graceful-restart helper enable {}".format(rtrs)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "helper" in gr_data:
+ if type(gr_data["helper"]) is not list:
+ gr_data["helper"] = list(gr_data["helper"])
+ for helper_role in gr_data["helper"]:
+ cmd = "graceful-restart helper {}".format(helper_role)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "supported-grace-time" in gr_data:
+ cmd = "graceful-restart helper supported-grace-time {}".format(
+ gr_data["supported-grace-time"]
+ )
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
logger.debug("Exiting lib API: create_ospf_global()")
- return result
+
+ return config_data
-def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
+def create_router_ospf6(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router
@@ -365,25 +392,43 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
logger.debug("Entering lib API: create_router_ospf6()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
if "ospf6" not in input_dict[router]:
logger.debug("Router %s: 'ospf6' not present in input_dict", router)
continue
- result = __create_ospf_global(
+ config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, "ospf6"
)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "ospf6", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf6", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf6()")
return result
-def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True):
+def config_ospf_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router.
@@ -418,10 +463,17 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
"""
logger.debug("Enter lib config_ospf_interface")
result = False
+
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
@@ -506,10 +558,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: config_ospf_interface()")
return result
@@ -577,7 +633,9 @@ def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
# Verification procs
################################
@retry(retry_timeout=80)
-def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expected=True):
+def verify_ospf_neighbor(
+ tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
+):
"""
This API is to verify ospf neighborship by running
show ip ospf neighbour command,
@@ -625,6 +683,9 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec
"""
logger.debug("Entering lib API: verify_ospf_neighbor()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
@@ -772,7 +833,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec
# Verification procs
################################
@retry(retry_timeout=50)
-def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
+def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
"""
This API is to verify ospf neighborship by running
show ipv6 ospf neighbour command,
@@ -820,6 +881,9 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
@@ -1078,7 +1142,7 @@ def verify_ospf_rib(
nh_found = False
for st_rt in ip_list:
- st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt)))
+ st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != "ipv4":
@@ -1263,7 +1327,9 @@ def verify_ospf_rib(
@retry(retry_timeout=20)
-def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expected=True):
+def verify_ospf_interface(
+ tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
+):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1305,6 +1371,9 @@ def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None, expe
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
@@ -1556,21 +1625,21 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
rnode = tgen.routers()[dut]
if ospf:
- if 'ospf6' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(
- router)
+ if "ospf6" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
return errormsg
- show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf summary detail json", isjson=True
+ )
else:
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- router)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
return errormsg
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf summary detail json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -1581,23 +1650,35 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
ospf_summary_data = input_dict
if ospf:
- show_ospf_json = show_ospf_json['default']
+ show_ospf_json = show_ospf_json["default"]
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
- summary = ospf_summary_data[ospf_summ]['Summary address']
+ summary = ospf_summary_data[ospf_summ]["Summary address"]
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
- logger.info("[DUT: %s] OSPF summary %s:%s is %s",
- router, summary, summ, summ_data[summ])
+ logger.info(
+ "[DUT: %s] OSPF summary %s:%s is %s",
+ router,
+ summary,
+ summ,
+ summ_data[summ],
+ )
result = True
else:
- errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, "
- "Expected is {}".format(router, summary, summ,show_ospf_json[
- summary][summ], summ_data[summ] ))
+ errormsg = (
+ "[DUT: {}] OSPF summary {} : {} is {}, "
+ "Expected is {}".format(
+ router,
+ summary,
+ summ,
+ show_ospf_json[summary][summ],
+ summ_data[summ],
+ )
+ )
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1605,8 +1686,9 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
@retry(retry_timeout=30)
-def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
- tag=None, metric=None, fib=None):
+def verify_ospf6_rib(
+ tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
+):
"""
This API is to verify ospf routes by running
show ip ospf route command.
@@ -1648,7 +1730,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
- for router, rnode in router_list.iteritems():
+ for router, rnode in router_list.items():
if router != dut:
continue
@@ -1881,7 +1963,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
@retry(retry_timeout=6)
-def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None):
+def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1923,7 +2005,10 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
- for router, rnode in tgen.routers().iteritems():
+ if topo is None:
+ topo = tgen.json_topo
+
+ for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
@@ -2260,7 +2345,9 @@ def verify_ospf6_database(tgen, topo, dut, input_dict):
return result
-def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config=True):
+def config_ospf6_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
"""
API to configure ospf on router.
@@ -2295,17 +2382,26 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
+ if topo is None:
+ topo = tgen.json_topo
+
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
- for lnk in input_dict[router]['links'].keys():
- if "ospf6" not in input_dict[router]['links'][lnk]:
- logger.debug("Router %s: ospf6 config is not present in"
- "input_dict, passed input_dict %s", router,
- str(input_dict))
+ for lnk in input_dict[router]["links"].keys():
+ if "ospf6" not in input_dict[router]["links"][lnk]:
+ logger.debug(
+ "Router %s: ospf6 config is not present in"
+ "input_dict, passed input_dict %s",
+ router,
+ str(input_dict),
+ )
continue
ospf_data = input_dict[router]["links"][lnk]["ospf6"]
data_ospf_area = ospf_data.setdefault("area", None)
@@ -2369,9 +2465,83 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
if build:
return config_data
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=20)
+def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
+ """
+ This API is used to vreify gr helper using command
+ show ip ospf graceful-restart helper
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * 'dut' : router
+ * 'input_dict' - values to be verified
+
+ Usage:
+ -------
+ input_dict = {
+ "helperSupport":"Disabled",
+ "strictLsaCheck":"Enabled",
+ "restartSupoort":"Planned and Unplanned Restarts",
+ "supportedGracePeriod":1800
+ }
+ result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
+
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+ logger.info("Verifying OSPF GR details on router %s:", dut)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf graceful-restart helper json", isjson=True
+ )
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ raise ValueError(errormsg)
+ return errormsg
+
+ for ospf_gr, gr_data in input_dict.items():
+ try:
+ if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
+ logger.info(
+ "[DUT: FRR] OSPF GR Helper: %s is %s",
+ ospf_gr,
+ show_ospf_json[ospf_gr],
+ )
+ result = True
else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
+ errormsg = (
+ "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
+ "is {}".format(
+ ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
+ )
)
+ raise ValueError(errormsg)
+ return errormsg
+
+ except KeyError:
+ errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
+ return errormsg
+
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result