summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py489
1 files changed, 307 insertions, 182 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 40da7c8fbe..beac768905 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -18,7 +18,6 @@
# OF THIS SOFTWARE.
#
-import traceback
import ipaddr
import ipaddress
import sys
@@ -28,10 +27,11 @@ from time import sleep
from lib.topolog import logger
from lib.topotest import frr_unicode
from ipaddress import IPv6Address
+import sys
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
retry,
generate_ips,
@@ -85,32 +85,36 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
topo = topo["routers"]
input_dict = deepcopy(input_dict)
- for router in input_dict.keys():
- if "ospf" not in input_dict[router]:
- logger.debug("Router %s: 'ospf' not present in input_dict", router)
- continue
+ for ospf in ["ospf", "ospf6"]:
+ config_data_dict = {}
- result = __create_ospf_global(tgen, input_dict, router, build, load_config)
- if result is True:
- ospf_data = input_dict[router]["ospf"]
-
- for router in input_dict.keys():
- if "ospf6" not in input_dict[router]:
- logger.debug("Router %s: 'ospf6' not present in input_dict", router)
- continue
+ for router in input_dict.keys():
+ if ospf not in input_dict[router]:
+ logger.debug("Router %s: %s not present in input_dict", router, ospf)
+ continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf="ospf6"
- )
- if result is True:
- ospf_data = input_dict[router]["ospf6"]
+ config_data = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, ospf
+ )
+ if config_data:
+ if router not in config_data_dict:
+ config_data_dict[router] = config_data
+ else:
+ config_data_dict[router].extend(config_data)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, ospf, build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf (ipv4)", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(
- tgen, input_dict, router, build=False, load_config=True, ospf="ospf"
+ tgen, input_dict, router, build, load_config, ospf
):
"""
Helper API to create ospf global configuration.
@@ -132,12 +136,12 @@ def __create_ospf_global(
"links": {
"r3": {
"ipv6": "2013:13::1/64",
- "ospf6": {
+ "ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
- }
+ }
},
"ospf6": {
"router_id": "1.1.1.1",
@@ -152,190 +156,221 @@ def __create_ospf_global(
Returns
-------
- True or False
+ list of configuration commands
"""
- result = False
- logger.debug("Entering lib API: __create_ospf_global()")
- try:
+ config_data = []
- ospf_data = input_dict[router][ospf]
- del_ospf_action = ospf_data.setdefault("delete", False)
- if del_ospf_action:
- config_data = ["no router {}".format(ospf)]
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
- return result
+ if ospf not in input_dict[router]:
+ return config_data
- config_data = []
- cmd = "router {}".format(ospf)
+ logger.debug("Entering lib API: __create_ospf_global()")
+ ospf_data = input_dict[router][ospf]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router {}".format(ospf)]
+ return config_data
+
+ cmd = "router {}".format(ospf)
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no {} router-id".format(ospf))
+ if router_id:
+ config_data.append("{} router-id {}".format(ospf, router_id))
+
+ # log-adjacency-changes
+ log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
+ del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
+ if del_log_adj_changes:
+ config_data.append("no log-adjacency-changes detail")
+ if log_adj_changes:
+ config_data.append("log-adjacency-changes {}".format(
+ log_adj_changes))
+
+ # aggregation timer
+ aggr_timer = ospf_data.setdefault("aggr_timer", None)
+ del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
+ if del_aggr_timer:
+ config_data.append("no aggregation timer")
+ if aggr_timer:
+ config_data.append("aggregation timer {}".format(
+ aggr_timer))
+
+ # maximum path information
+ ecmp_data = ospf_data.setdefault("maximum-paths", {})
+ if ecmp_data:
+ cmd = "maximum-paths {}".format(ecmp_data)
+ del_action = ospf_data.setdefault("del_max_path", False)
+ if del_action:
+ cmd = "no maximum-paths"
config_data.append(cmd)
- # router id
- router_id = ospf_data.setdefault("router_id", None)
- del_router_id = ospf_data.setdefault("del_router_id", False)
- if del_router_id:
- config_data.append("no {} router-id".format(ospf))
- if router_id:
- config_data.append("{} router-id {}".format(ospf, router_id))
-
- # log-adjacency-changes
- log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
- del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
- if del_log_adj_changes:
- config_data.append("no log-adjacency-changes detail")
- if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(log_adj_changes))
-
- # aggregation timer
- aggr_timer = ospf_data.setdefault("aggr_timer", None)
- del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
- if del_aggr_timer:
- config_data.append("no aggregation timer")
- if aggr_timer:
- config_data.append("aggregation timer {}".format(aggr_timer))
-
- # maximum path information
- ecmp_data = ospf_data.setdefault("maximum-paths", {})
- if ecmp_data:
- cmd = "maximum-paths {}".format(ecmp_data)
- del_action = ospf_data.setdefault("del_max_path", False)
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
+ else:
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
+ else:
+ cmd = "area {}".format(area["id"])
+
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
+
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ #def route information
+ def_rte_data = ospf_data.setdefault("default-information", {})
+ if def_rte_data:
+ if "originate" not in def_rte_data:
+ logger.debug("Router %s: 'originate key' not present in "
+ "input_dict", router)
+ else:
+ cmd = "default-information originate"
+
+ if "always" in def_rte_data:
+ cmd = cmd + " always"
+
+ if "metric" in def_rte_data:
+ cmd = cmd + " metric {}".format(def_rte_data["metric"])
+
+ if "metric-type" in def_rte_data:
+ cmd = cmd + " metric-type {}".format(def_rte_data[
+ "metric-type"])
+
+ if "route-map" in def_rte_data:
+ cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
+
+ del_action = def_rte_data.setdefault("delete", False)
if del_action:
- cmd = "no maximum-paths"
+ cmd = "no {}".format(cmd)
config_data.append(cmd)
- # redistribute command
- redistribute_data = ospf_data.setdefault("redistribute", {})
- if redistribute_data:
- for redistribute in redistribute_data:
- if "redist_type" not in redistribute:
- logger.debug(
- "Router %s: 'redist_type' not present in " "input_dict", router
+ # area interface information for ospf6d only
+ if ospf == "ospf6":
+ area_iface = ospf_data.setdefault("neighbors", {})
+ if area_iface:
+ for neighbor in area_iface:
+ if "area" in area_iface[neighbor]:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, area_iface[neighbor]["area"]
)
- else:
- cmd = "redistribute {}".format(redistribute["redist_type"])
- for red_type in redistribute_data:
- if "route_map" in red_type:
- cmd = cmd + " route-map {}".format(red_type["route_map"])
- del_action = redistribute.setdefault("delete", False)
- if del_action:
+ if area_iface[neighbor].setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- # area information
- area_data = ospf_data.setdefault("area", {})
- if area_data:
- for area in area_data:
- if "id" not in area:
- logger.debug(
- "Router %s: 'area id' not present in " "input_dict", router
- )
- else:
- cmd = "area {}".format(area["id"])
+ try:
+ if "area" in input_dict[router]['links'][neighbor][
+ 'ospf6']:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, input_dict[router]['links'][neighbor][
+ 'ospf6']['area'])
+ if input_dict[router]['links'][neighbor].setdefault(
+ "delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ except KeyError:
+ pass
- if "type" in area:
- cmd = cmd + " {}".format(area["type"])
- del_action = area.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
-
- # def route information
- def_rte_data = ospf_data.setdefault("default-information", {})
- if def_rte_data:
- if "originate" not in def_rte_data:
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
logger.debug(
- "Router %s: 'originate key' not present in " "input_dict", router
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
)
else:
- cmd = "default-information originate"
-
- if "always" in def_rte_data:
- cmd = cmd + " always"
-
- if "metric" in def_rte_data:
- cmd = cmd + " metric {}".format(def_rte_data["metric"])
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
- if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
- if "route-map" in def_rte_data:
- cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
- del_action = def_rte_data.setdefault("delete", False)
+ del_action = summary.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- # area interface information for ospf6d only
- if ospf == "ospf6":
- area_iface = ospf_data.setdefault("neighbors", {})
- if area_iface:
- for neighbor in area_iface:
- if "area" in area_iface[neighbor]:
- iface = input_dict[router]["links"][neighbor]["interface"]
- cmd = "interface {} area {}".format(
- iface, area_iface[neighbor]["area"]
- )
- if area_iface[neighbor].setdefault("delete", False):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
-
- try:
- if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
- iface = input_dict[router]["links"][neighbor]["interface"]
- cmd = "interface {} area {}".format(
- iface,
- input_dict[router]["links"][neighbor]["ospf6"]["area"],
- )
- if input_dict[router]["links"][neighbor].setdefault(
- "delete", False
- ):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- except KeyError:
- pass
-
- # summary information
- summary_data = ospf_data.setdefault("summary-address", {})
- if summary_data:
- for summary in summary_data:
- if "prefix" not in summary:
- logger.debug(
- "Router %s: 'summary-address' not present in " "input_dict",
- router,
- )
- else:
- cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
+ # ospf gr information
+ gr_data = ospf_data.setdefault("graceful-restart", {})
+ if gr_data:
- _tag = summary.setdefault("tag", None)
- if _tag:
- cmd = "{} tag {}".format(cmd, _tag)
-
- _advertise = summary.setdefault("advertise", True)
- if not _advertise:
- cmd = "{} no-advertise".format(cmd)
+ if "opaque" in gr_data and gr_data["opaque"]:
+ cmd = "capability opaque"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = summary.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ if "helper-only" in gr_data and not gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
+ for rtrs in gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only {}".format(rtrs)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
+ if "helper" in gr_data:
+ if type(gr_data["helper"]) is not list:
+ gr_data["helper"] = list(gr_data["helper"])
+ for helper_role in gr_data["helper"]:
+ cmd = "graceful-restart helper {}".format(helper_role)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if "supported-grace-time" in gr_data:
+ cmd = "graceful-restart helper supported-grace-time {}".format(
+ gr_data["supported-grace-time"]
+ )
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
logger.debug("Exiting lib API: create_ospf_global()")
- return result
+
+ return config_data
def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
@@ -370,14 +405,27 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
if "ospf6" not in input_dict[router]:
logger.debug("Router %s: 'ospf6' not present in input_dict", router)
continue
- result = __create_ospf_global(
+ config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, "ospf6"
)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "ospf6", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf6", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf6()")
return result
@@ -422,6 +470,9 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
@@ -506,10 +557,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: config_ospf_interface()")
return result
@@ -2299,6 +2354,9 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]['links'].keys():
@@ -2369,9 +2427,76 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
if build:
return config_data
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+@retry(retry_timeout=20)
+def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
+ """
+ This API is used to vreify gr helper using command
+ show ip ospf graceful-restart helper
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * 'dut' : router
+ * 'input_dict' - values to be verified
+
+ Usage:
+ -------
+ input_dict = {
+ "helperSupport":"Disabled",
+ "strictLsaCheck":"Enabled",
+ "restartSupoort":"Planned and Unplanned Restarts",
+ "supportedGracePeriod":1800
+ }
+ result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
+
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ if 'ospf' not in topo['routers'][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
+ dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+ logger.info("Verifying OSPF GR details on router %s:", dut)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json",
+ isjson=True)
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ raise ValueError (errormsg)
+ return errormsg
+
+ for ospf_gr, gr_data in input_dict.items():
+ try:
+ if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
+ logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr,
+ show_ospf_json[ospf_gr])
+ result = True
else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+ errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
+ "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[
+ ospf_gr]))
+ raise ValueError (errormsg)
+ return errormsg
+
+ except KeyError:
+ errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr))
+ return errormsg
+
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result