summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py448
1 files changed, 235 insertions, 213 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 9646daf725..beac768905 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -18,7 +18,6 @@
# OF THIS SOFTWARE.
#
-import traceback
import ipaddr
import ipaddress
import sys
@@ -32,7 +31,7 @@ import sys
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
retry,
generate_ips,
@@ -86,32 +85,36 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
topo = topo["routers"]
input_dict = deepcopy(input_dict)
- for router in input_dict.keys():
- if "ospf" not in input_dict[router]:
- logger.debug("Router %s: 'ospf' not present in input_dict", router)
- continue
+ for ospf in ["ospf", "ospf6"]:
+ config_data_dict = {}
- result = __create_ospf_global(tgen, input_dict, router, build, load_config)
- if result is True:
- ospf_data = input_dict[router]["ospf"]
-
- for router in input_dict.keys():
- if "ospf6" not in input_dict[router]:
- logger.debug("Router %s: 'ospf6' not present in input_dict", router)
- continue
+ for router in input_dict.keys():
+ if ospf not in input_dict[router]:
+ logger.debug("Router %s: %s not present in input_dict", router, ospf)
+ continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf="ospf6"
- )
- if result is True:
- ospf_data = input_dict[router]["ospf6"]
+ config_data = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, ospf
+ )
+ if config_data:
+ if router not in config_data_dict:
+ config_data_dict[router] = config_data
+ else:
+ config_data_dict[router].extend(config_data)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, ospf, build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf (ipv4)", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(
- tgen, input_dict, router, build=False, load_config=True, ospf="ospf"
+ tgen, input_dict, router, build, load_config, ospf
):
"""
Helper API to create ospf global configuration.
@@ -133,12 +136,12 @@ def __create_ospf_global(
"links": {
"r3": {
"ipv6": "2013:13::1/64",
- "ospf6": {
+ "ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
- }
+ }
},
"ospf6": {
"router_id": "1.1.1.1",
@@ -153,229 +156,221 @@ def __create_ospf_global(
Returns
-------
- True or False
+ list of configuration commands
"""
- result = False
- logger.debug("Entering lib API: __create_ospf_global()")
- try:
+ config_data = []
- ospf_data = input_dict[router][ospf]
- del_ospf_action = ospf_data.setdefault("delete", False)
- if del_ospf_action:
- config_data = ["no router {}".format(ospf)]
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
- return result
+ if ospf not in input_dict[router]:
+ return config_data
- config_data = []
- cmd = "router {}".format(ospf)
+ logger.debug("Entering lib API: __create_ospf_global()")
+ ospf_data = input_dict[router][ospf]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router {}".format(ospf)]
+ return config_data
+
+ cmd = "router {}".format(ospf)
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no {} router-id".format(ospf))
+ if router_id:
+ config_data.append("{} router-id {}".format(ospf, router_id))
+
+ # log-adjacency-changes
+ log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
+ del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
+ if del_log_adj_changes:
+ config_data.append("no log-adjacency-changes detail")
+ if log_adj_changes:
+ config_data.append("log-adjacency-changes {}".format(
+ log_adj_changes))
+
+ # aggregation timer
+ aggr_timer = ospf_data.setdefault("aggr_timer", None)
+ del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
+ if del_aggr_timer:
+ config_data.append("no aggregation timer")
+ if aggr_timer:
+ config_data.append("aggregation timer {}".format(
+ aggr_timer))
+
+ # maximum path information
+ ecmp_data = ospf_data.setdefault("maximum-paths", {})
+ if ecmp_data:
+ cmd = "maximum-paths {}".format(ecmp_data)
+ del_action = ospf_data.setdefault("del_max_path", False)
+ if del_action:
+ cmd = "no maximum-paths"
config_data.append(cmd)
- # router id
- router_id = ospf_data.setdefault("router_id", None)
- del_router_id = ospf_data.setdefault("del_router_id", False)
- if del_router_id:
- config_data.append("no {} router-id".format(ospf))
- if router_id:
- config_data.append("{} router-id {}".format(ospf, router_id))
-
- # log-adjacency-changes
- log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
- del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
- if del_log_adj_changes:
- config_data.append("no log-adjacency-changes detail")
- if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(log_adj_changes))
-
- # aggregation timer
- aggr_timer = ospf_data.setdefault("aggr_timer", None)
- del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
- if del_aggr_timer:
- config_data.append("no aggregation timer")
- if aggr_timer:
- config_data.append("aggregation timer {}".format(aggr_timer))
-
- # maximum path information
- ecmp_data = ospf_data.setdefault("maximum-paths", {})
- if ecmp_data:
- cmd = "maximum-paths {}".format(ecmp_data)
- del_action = ospf_data.setdefault("del_max_path", False)
- if del_action:
- cmd = "no maximum-paths"
- config_data.append(cmd)
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
+ else:
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # redistribute command
- redistribute_data = ospf_data.setdefault("redistribute", {})
- if redistribute_data:
- for redistribute in redistribute_data:
- if "redist_type" not in redistribute:
- logger.debug(
- "Router %s: 'redist_type' not present in " "input_dict", router
- )
- else:
- cmd = "redistribute {}".format(redistribute["redist_type"])
- for red_type in redistribute_data:
- if "route_map" in red_type:
- cmd = cmd + " route-map {}".format(red_type["route_map"])
- del_action = redistribute.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
+ else:
+ cmd = "area {}".format(area["id"])
- # area information
- area_data = ospf_data.setdefault("area", {})
- if area_data:
- for area in area_data:
- if "id" not in area:
- logger.debug(
- "Router %s: 'area id' not present in " "input_dict", router
- )
- else:
- cmd = "area {}".format(area["id"])
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
- if "type" in area:
- cmd = cmd + " {}".format(area["type"])
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = area.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ #def route information
+ def_rte_data = ospf_data.setdefault("default-information", {})
+ if def_rte_data:
+ if "originate" not in def_rte_data:
+ logger.debug("Router %s: 'originate key' not present in "
+ "input_dict", router)
+ else:
+ cmd = "default-information originate"
- # def route information
- def_rte_data = ospf_data.setdefault("default-information", {})
- if def_rte_data:
- if "originate" not in def_rte_data:
- logger.debug(
- "Router %s: 'originate key' not present in " "input_dict", router
- )
- else:
- cmd = "default-information originate"
+ if "always" in def_rte_data:
+ cmd = cmd + " always"
- if "always" in def_rte_data:
- cmd = cmd + " always"
+ if "metric" in def_rte_data:
+ cmd = cmd + " metric {}".format(def_rte_data["metric"])
- if "metric" in def_rte_data:
- cmd = cmd + " metric {}".format(def_rte_data["metric"])
+ if "metric-type" in def_rte_data:
+ cmd = cmd + " metric-type {}".format(def_rte_data[
+ "metric-type"])
- if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
+ if "route-map" in def_rte_data:
+ cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
- if "route-map" in def_rte_data:
- cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
+ del_action = def_rte_data.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = def_rte_data.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area interface information for ospf6d only
+ if ospf == "ospf6":
+ area_iface = ospf_data.setdefault("neighbors", {})
+ if area_iface:
+ for neighbor in area_iface:
+ if "area" in area_iface[neighbor]:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, area_iface[neighbor]["area"]
+ )
+ if area_iface[neighbor].setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # area interface information for ospf6d only
- if ospf == "ospf6":
- area_iface = ospf_data.setdefault("neighbors", {})
- if area_iface:
- for neighbor in area_iface:
- if "area" in area_iface[neighbor]:
+ try:
+ if "area" in input_dict[router]['links'][neighbor][
+ 'ospf6']:
iface = input_dict[router]["links"][neighbor]["interface"]
cmd = "interface {} area {}".format(
- iface, area_iface[neighbor]["area"]
- )
- if area_iface[neighbor].setdefault("delete", False):
+ iface, input_dict[router]['links'][neighbor][
+ 'ospf6']['area'])
+ if input_dict[router]['links'][neighbor].setdefault(
+ "delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
-
- try:
- if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
- iface = input_dict[router]["links"][neighbor]["interface"]
- cmd = "interface {} area {}".format(
- iface,
- input_dict[router]["links"][neighbor]["ospf6"]["area"],
- )
- if input_dict[router]["links"][neighbor].setdefault(
- "delete", False
- ):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- except KeyError:
+ except KeyError:
pass
- # summary information
- summary_data = ospf_data.setdefault("summary-address", {})
- if summary_data:
- for summary in summary_data:
- if "prefix" not in summary:
- logger.debug(
- "Router %s: 'summary-address' not present in " "input_dict",
- router,
- )
- else:
- cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
-
- _tag = summary.setdefault("tag", None)
- if _tag:
- cmd = "{} tag {}".format(cmd, _tag)
- _advertise = summary.setdefault("advertise", True)
- if not _advertise:
- cmd = "{} no-advertise".format(cmd)
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
+ logger.debug(
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
+ )
+ else:
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
- del_action = summary.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
- # ospf gr information
- gr_data = ospf_data.setdefault("graceful-restart", {})
- if gr_data:
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
- if "opaque" in gr_data and gr_data["opaque"]:
- cmd = "capability opaque"
- if gr_data.setdefault("delete", False):
+ del_action = summary.setdefault("delete", False)
+ if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- if "helper-only" in gr_data and not gr_data["helper-only"]:
- cmd = "graceful-restart helper-only"
+ # ospf gr information
+ gr_data = ospf_data.setdefault("graceful-restart", {})
+ if gr_data:
+
+ if "opaque" in gr_data and gr_data["opaque"]:
+ cmd = "capability opaque"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "helper-only" in gr_data and not gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
+ for rtrs in gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only {}".format(rtrs)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
- for rtrs in gr_data["helper-only"]:
- cmd = "graceful-restart helper-only {}".format(rtrs)
- if gr_data.setdefault("delete", False):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
-
- if "helper" in gr_data:
- if type(gr_data["helper"]) is not list:
- gr_data["helper"] = list(gr_data["helper"])
- for helper_role in gr_data["helper"]:
- cmd = "graceful-restart helper {}".format(helper_role)
- if gr_data.setdefault("delete", False):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- if "supported-grace-time" in gr_data:
- cmd = "graceful-restart helper supported-grace-time {}".format(
- gr_data["supported-grace-time"]
- )
+ if "helper" in gr_data:
+ if type(gr_data["helper"]) is not list:
+ gr_data["helper"] = list(gr_data["helper"])
+ for helper_role in gr_data["helper"]:
+ cmd = "graceful-restart helper {}".format(helper_role)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
- )
-
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if "supported-grace-time" in gr_data:
+ cmd = "graceful-restart helper supported-grace-time {}".format(
+ gr_data["supported-grace-time"]
+ )
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
logger.debug("Exiting lib API: create_ospf_global()")
- return result
+
+ return config_data
def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
@@ -410,14 +405,27 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
if "ospf6" not in input_dict[router]:
logger.debug("Router %s: 'ospf6' not present in input_dict", router)
continue
- result = __create_ospf_global(
+ config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, "ospf6"
)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "ospf6", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf6", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf6()")
return result
@@ -462,6 +470,9 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
@@ -546,10 +557,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: config_ospf_interface()")
return result
@@ -2339,6 +2354,9 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]['links'].keys():
@@ -2409,10 +2427,14 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result