summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py1310
1 files changed, 789 insertions, 521 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index dc9fe0fcca..beac768905 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -18,7 +18,6 @@
# OF THIS SOFTWARE.
#
-import traceback
import ipaddr
import ipaddress
import sys
@@ -28,9 +27,11 @@ from time import sleep
from lib.topolog import logger
from lib.topotest import frr_unicode
from ipaddress import IPv6Address
+import sys
+
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
retry,
generate_ips,
@@ -84,32 +85,36 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
topo = topo["routers"]
input_dict = deepcopy(input_dict)
- for router in input_dict.keys():
- if "ospf" not in input_dict[router]:
- logger.debug("Router %s: 'ospf' not present in input_dict", router)
- continue
-
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config)
- if result is True:
- ospf_data = input_dict[router]["ospf"]
+ for ospf in ["ospf", "ospf6"]:
+ config_data_dict = {}
- for router in input_dict.keys():
- if "ospf6" not in input_dict[router]:
- logger.debug("Router %s: 'ospf6' not present in input_dict", router)
- continue
+ for router in input_dict.keys():
+ if ospf not in input_dict[router]:
+ logger.debug("Router %s: %s not present in input_dict", router, ospf)
+ continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf='ospf6')
- if result is True:
- ospf_data = input_dict[router]["ospf6"]
+ config_data = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, ospf
+ )
+ if config_data:
+ if router not in config_data_dict:
+ config_data_dict[router] = config_data
+ else:
+ config_data_dict[router].extend(config_data)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, ospf, build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf (ipv4)", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(
- tgen, input_dict, router, build=False, load_config=True, ospf="ospf"
+ tgen, input_dict, router, build, load_config, ospf
):
"""
Helper API to create ospf global configuration.
@@ -131,12 +136,12 @@ def __create_ospf_global(
"links": {
"r3": {
"ipv6": "2013:13::1/64",
- "ospf6": {
+ "ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
- }
+ }
},
"ospf6": {
"router_id": "1.1.1.1",
@@ -151,194 +156,221 @@ def __create_ospf_global(
Returns
-------
- True or False
+ list of configuration commands
"""
- result = False
- logger.debug("Entering lib API: __create_ospf_global()")
- try:
+ config_data = []
- ospf_data = input_dict[router][ospf]
- del_ospf_action = ospf_data.setdefault("delete", False)
- if del_ospf_action:
- config_data = ["no router {}".format(ospf)]
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
- return result
+ if ospf not in input_dict[router]:
+ return config_data
- config_data = []
- cmd = "router {}".format(ospf)
+ logger.debug("Entering lib API: __create_ospf_global()")
+ ospf_data = input_dict[router][ospf]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router {}".format(ospf)]
+ return config_data
+
+ cmd = "router {}".format(ospf)
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no {} router-id".format(ospf))
+ if router_id:
+ config_data.append("{} router-id {}".format(ospf, router_id))
+
+ # log-adjacency-changes
+ log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
+ del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
+ if del_log_adj_changes:
+ config_data.append("no log-adjacency-changes detail")
+ if log_adj_changes:
+ config_data.append("log-adjacency-changes {}".format(
+ log_adj_changes))
+
+ # aggregation timer
+ aggr_timer = ospf_data.setdefault("aggr_timer", None)
+ del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
+ if del_aggr_timer:
+ config_data.append("no aggregation timer")
+ if aggr_timer:
+ config_data.append("aggregation timer {}".format(
+ aggr_timer))
+
+ # maximum path information
+ ecmp_data = ospf_data.setdefault("maximum-paths", {})
+ if ecmp_data:
+ cmd = "maximum-paths {}".format(ecmp_data)
+ del_action = ospf_data.setdefault("del_max_path", False)
+ if del_action:
+ cmd = "no maximum-paths"
config_data.append(cmd)
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
+ else:
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # router id
- router_id = ospf_data.setdefault("router_id", None)
- del_router_id = ospf_data.setdefault("del_router_id", False)
- if del_router_id:
- config_data.append("no {} router-id".format(ospf))
- if router_id:
- config_data.append("{} router-id {}".format(ospf, router_id))
-
- # log-adjacency-changes
- log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
- del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
- if del_log_adj_changes:
- config_data.append("no log-adjacency-changes detail")
- if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(
- log_adj_changes))
-
- # aggregation timer
- aggr_timer = ospf_data.setdefault("aggr_timer", None)
- del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
- if del_aggr_timer:
- config_data.append("no aggregation timer")
- if aggr_timer:
- config_data.append("aggregation timer {}".format(
- aggr_timer))
-
- # maximum path information
- ecmp_data = ospf_data.setdefault("maximum-paths", {})
- if ecmp_data:
- cmd = "maximum-paths {}".format(ecmp_data)
- del_action = ospf_data.setdefault("del_max_path", False)
- if del_action:
- cmd = "no maximum-paths"
- config_data.append(cmd)
-
- # redistribute command
- redistribute_data = ospf_data.setdefault("redistribute", {})
- if redistribute_data:
- for redistribute in redistribute_data:
- if "redist_type" not in redistribute:
- logger.debug(
- "Router %s: 'redist_type' not present in " "input_dict", router
- )
- else:
- cmd = "redistribute {}".format(redistribute["redist_type"])
- for red_type in redistribute_data:
- if "route_map" in red_type:
- cmd = cmd + " route-map {}".format(red_type["route_map"])
- del_action = redistribute.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
+ else:
+ cmd = "area {}".format(area["id"])
- # area information
- area_data = ospf_data.setdefault("area", {})
- if area_data:
- for area in area_data:
- if "id" not in area:
- logger.debug(
- "Router %s: 'area id' not present in " "input_dict", router
- )
- else:
- cmd = "area {}".format(area["id"])
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
- if "type" in area:
- cmd = cmd + " {}".format(area["type"])
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = area.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ #def route information
+ def_rte_data = ospf_data.setdefault("default-information", {})
+ if def_rte_data:
+ if "originate" not in def_rte_data:
+ logger.debug("Router %s: 'originate key' not present in "
+ "input_dict", router)
+ else:
+ cmd = "default-information originate"
- #def route information
- def_rte_data = ospf_data.setdefault("default-information", {})
- if def_rte_data:
- if "originate" not in def_rte_data:
- logger.debug("Router %s: 'originate key' not present in "
- "input_dict", router)
- else:
- cmd = "default-information originate"
+ if "always" in def_rte_data:
+ cmd = cmd + " always"
- if "always" in def_rte_data:
- cmd = cmd + " always"
+ if "metric" in def_rte_data:
+ cmd = cmd + " metric {}".format(def_rte_data["metric"])
- if "metric" in def_rte_data:
- cmd = cmd + " metric {}".format(def_rte_data["metric"])
+ if "metric-type" in def_rte_data:
+ cmd = cmd + " metric-type {}".format(def_rte_data[
+ "metric-type"])
- if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data[
- "metric-type"])
+ if "route-map" in def_rte_data:
+ cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
- if "route-map" in def_rte_data:
- cmd = cmd + " route-map {}".format(def_rte_data[
- "route-map"])
+ del_action = def_rte_data.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = def_rte_data.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area interface information for ospf6d only
+ if ospf == "ospf6":
+ area_iface = ospf_data.setdefault("neighbors", {})
+ if area_iface:
+ for neighbor in area_iface:
+ if "area" in area_iface[neighbor]:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, area_iface[neighbor]["area"]
+ )
+ if area_iface[neighbor].setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # area interface information for ospf6d only
- if ospf == "ospf6":
- area_iface = ospf_data.setdefault("neighbors", {})
- if area_iface:
- for neighbor in area_iface:
- if "area" in area_iface[neighbor]:
+ try:
+ if "area" in input_dict[router]['links'][neighbor][
+ 'ospf6']:
iface = input_dict[router]["links"][neighbor]["interface"]
cmd = "interface {} area {}".format(
- iface, area_iface[neighbor]["area"]
- )
- if area_iface[neighbor].setdefault("delete", False):
+ iface, input_dict[router]['links'][neighbor][
+ 'ospf6']['area'])
+ if input_dict[router]['links'][neighbor].setdefault(
+ "delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ except KeyError:
+ pass
- try:
- if "area" in input_dict[router]['links'][neighbor][
- 'ospf6']:
- iface = input_dict[router]["links"][neighbor]["interface"]
- cmd = "interface {} area {}".format(
- iface, input_dict[router]['links'][neighbor][
- 'ospf6']['area'])
- if input_dict[router]['links'][neighbor].setdefault(
- "delete", False):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- except KeyError:
- pass
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
+ logger.debug(
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
+ )
+ else:
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
- # summary information
- summary_data = ospf_data.setdefault("summary-address", {})
- if summary_data:
- for summary in summary_data:
- if "prefix" not in summary:
- logger.debug(
- "Router %s: 'summary-address' not present in " "input_dict",
- router,
- )
- else:
- cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
- _tag = summary.setdefault("tag", None)
- if _tag:
- cmd = "{} tag {}".format(cmd, _tag)
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
- _advertise = summary.setdefault("advertise", True)
- if not _advertise:
- cmd = "{} no-advertise".format(cmd)
+ del_action = summary.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = summary.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # ospf gr information
+ gr_data = ospf_data.setdefault("graceful-restart", {})
+ if gr_data:
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
+ if "opaque" in gr_data and gr_data["opaque"]:
+ cmd = "capability opaque"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if "helper-only" in gr_data and not gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
+ for rtrs in gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only {}".format(rtrs)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "helper" in gr_data:
+ if type(gr_data["helper"]) is not list:
+ gr_data["helper"] = list(gr_data["helper"])
+ for helper_role in gr_data["helper"]:
+ cmd = "graceful-restart helper {}".format(helper_role)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "supported-grace-time" in gr_data:
+ cmd = "graceful-restart helper supported-grace-time {}".format(
+ gr_data["supported-grace-time"]
+ )
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
logger.debug("Exiting lib API: create_ospf_global()")
- return result
+
+ return config_data
def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
@@ -373,14 +405,27 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
if "ospf6" not in input_dict[router]:
logger.debug("Router %s: 'ospf6' not present in input_dict", router)
continue
- result = __create_ospf_global(
+ config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, "ospf6"
)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "ospf6", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf6", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf6()")
return result
@@ -420,10 +465,14 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
True or False
"""
logger.debug("Enter lib config_ospf_interface")
+ result = False
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
@@ -502,16 +551,20 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
# interface ospf mtu
if data_ospf_mtu:
cmd = "ip ospf mtu-ignore"
- if 'del_action' in ospf_data:
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: config_ospf_interface()")
return result
@@ -543,8 +596,7 @@ def clear_ospf(tgen, router, ospf=None):
version = "ip"
cmd = "clear {} ospf interface".format(version)
- logger.info(
- "Clearing ospf process on router %s.. using command '%s'", router, cmd)
+ logger.info("Clearing ospf process on router %s.. using command '%s'", router, cmd)
run_frr_cmd(rnode, cmd)
logger.debug("Exiting lib API: clear_ospf()")
@@ -774,7 +826,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False, expec
################################
# Verification procs
################################
-@retry(retry_timeout=20)
+@retry(retry_timeout=50)
def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
"""
This API is to verify ospf neighborship by running
@@ -825,105 +877,133 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
if input_dict:
for router, rnode in tgen.routers().items():
- if 'ospf6' not in topo['routers'][router]:
+ if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
- show_ospf_json = run_frr_cmd(rnode,
- "show ipv6 ospf neighbor json", isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf neighbor json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
ospf_data_list = input_dict[router]["ospf6"]
- ospf_nbr_list = ospf_data_list['neighbors']
+ ospf_nbr_list = ospf_data_list["neighbors"]
for ospf_nbr, nbr_data in ospf_nbr_list.items():
- data_ip = data_rid = topo['routers'][ospf_nbr]['ospf6']['router_id']
+
+ try:
+ data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
+ except KeyError:
+ data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
+ "router_id"
+ ]
+
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
- for switch in topo['switches']:
- if 'ospf6' in topo['switches'][switch]['links'][router]:
+ for switch in topo["switches"]:
+ if "ospf6" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip
else:
continue
else:
- neighbor_ip = data_ip[router]['ipv6'].split("/")[0]
+ neighbor_ip = data_ip[router]["ipv6"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
- get_index_val = dict((d['neighborId'], dict( \
- d, index=index)) for (index, d) in enumerate( \
- show_ospf_json['neighbors']))
+ get_index_val = dict(
+ (d["neighborId"], dict(d, index=index))
+ for (index, d) in enumerate(show_ospf_json["neighbors"])
+ )
try:
- nh_state = get_index_val.get(neighbor_ip)['state']
- intf_state = get_index_val.get(neighbor_ip)['ifState']
+ nh_state = get_index_val.get(neighbor_ip)["state"]
+ intf_state = get_index_val.get(neighbor_ip)["ifState"]
except TypeError:
- errormsg = "[DUT: {}] OSPF peer {} missing,from "\
- "{} ".format(router,
- nbr_rid, ospf_nbr)
+ errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
+ router, nbr_rid, ospf_nbr
+ )
return errormsg
- nbr_state = nbr_data.setdefault("state",None)
- nbr_role = nbr_data.setdefault("role",None)
+ nbr_state = nbr_data.setdefault("state", None)
+ nbr_role = nbr_data.setdefault("role", None)
if nbr_state:
if nbr_state == nh_state:
- logger.info("[DUT: {}] OSPF6 Nbr is {}:{} State {}".format
- (router, ospf_nbr, nbr_rid, nh_state))
+ logger.info(
+ "[DUT: {}] OSPF6 Nbr is {}:{} State {}".format(
+ router, ospf_nbr, nbr_rid, nh_state
+ )
+ )
result = True
else:
- errormsg = ("[DUT: {}] OSPF6 is not Converged, neighbor"
- " state is {} , Expected state is {}".format(router,
- nh_state, nbr_state))
+ errormsg = (
+ "[DUT: {}] OSPF6 is not Converged, neighbor"
+ " state is {} , Expected state is {}".format(
+ router, nh_state, nbr_state
+ )
+ )
return errormsg
if nbr_role:
if nbr_role == intf_state:
- logger.info("[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format(
- router, ospf_nbr, nbr_rid, nbr_role))
+ logger.info(
+ "[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format(
+ router, ospf_nbr, nbr_rid, nbr_role
+ )
+ )
else:
- errormsg = ("[DUT: {}] OSPF6 is not Converged with rid"
- "{}, role is {}, Expected role is {}".format(router,
- nbr_rid, intf_state, nbr_role))
+ errormsg = (
+ "[DUT: {}] OSPF6 is not Converged with rid"
+ "{}, role is {}, Expected role is {}".format(
+ router, nbr_rid, intf_state, nbr_role
+ )
+ )
return errormsg
continue
else:
for router, rnode in tgen.routers().items():
- if 'ospf6' not in topo['routers'][router]:
+ if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF6 neighborship on router %s:", router)
- show_ospf_json = run_frr_cmd(rnode,
- "show ipv6 ospf neighbor json", isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf neighbor json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf6"]
- ospf_neighbors = ospf_data_list['neighbors']
+ ospf_neighbors = ospf_data_list["neighbors"]
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
- ospf_nbr_list = ospf_data_list['neighbors']
+ ospf_nbr_list = ospf_data_list["neighbors"]
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
- data_ip = data_rid = topo['routers'][ospf_nbr]['ospf6']['router_id']
+ try:
+ data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
+ except KeyError:
+ data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
+ "router_id"
+ ]
+
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
- for switch in topo['switches']:
- if 'ospf6' in topo['switches'][switch]['links'][router]:
+ for switch in topo["switches"]:
+ if "ospf6" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip
else:
continue
@@ -933,26 +1013,27 @@ def verify_ospf6_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
- get_index_val = dict((d['neighborId'], dict( \
- d, index=index)) for (index, d) in enumerate( \
- show_ospf_json['neighbors']))
+ get_index_val = dict(
+ (d["neighborId"], dict(d, index=index))
+ for (index, d) in enumerate(show_ospf_json["neighbors"])
+ )
try:
- nh_state = get_index_val.get(neighbor_ip)['state']
- intf_state = get_index_val.get(neighbor_ip)['ifState']
+ nh_state = get_index_val.get(neighbor_ip)["state"]
+ intf_state = get_index_val.get(neighbor_ip)["ifState"]
except TypeError:
- errormsg = "[DUT: {}] OSPF peer {} missing,from "\
- "{} ".format(router,
- nbr_rid, ospf_nbr)
+ errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
+ router, nbr_rid, ospf_nbr
+ )
return errormsg
- if nh_state == 'Full':
+ if nh_state == "Full":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF6 is Converged".format(router))
result = True
else:
- errormsg = ("[DUT: {}] OSPF6 is not Converged".format(router))
+ errormsg = "[DUT: {}] OSPF6 is not Converged".format(router)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1491,7 +1572,7 @@ def verify_ospf_database(tgen, topo, dut, input_dict, expected=True):
@retry(retry_timeout=20)
-def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True):
+def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -1502,7 +1583,6 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True):
* `topo` : topology descriptions
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
- * `expected` : expected results from API, by-default True
Usage
-----
@@ -1522,18 +1602,30 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True):
True or False (Error Message)
"""
- logger.debug("Entering lib API: verify_ospf_summary()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
router = dut
logger.info("Verifying OSPF summary on router %s:", router)
- if "ospf" not in topo["routers"][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
- return errormsg
-
rnode = tgen.routers()[dut]
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", isjson=True)
+
+ if ospf:
+ if 'ospf6' not in topo['routers'][dut]:
+ errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(
+ router)
+ return errormsg
+
+ show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf summary detail json",
+ isjson=True)
+ else:
+ if 'ospf' not in topo['routers'][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
+ router)
+ return errormsg
+
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
+ isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -1542,35 +1634,31 @@ def verify_ospf_summary(tgen, topo, dut, input_dict, expected=True):
# To find neighbor ip type
ospf_summary_data = input_dict
+
+ if ospf:
+ show_ospf_json = show_ospf_json['default']
+
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
- summary = ospf_summary_data[ospf_summ]["Summary address"]
+ summary = ospf_summary_data[ospf_summ]['Summary address']
+
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
- logger.info(
- "[DUT: %s] OSPF summary %s:%s is %s",
- router,
- summary,
- summ,
- summ_data[summ],
- )
+ logger.info("[DUT: %s] OSPF summary %s:%s is %s",
+ router, summary, summ, summ_data[summ])
result = True
else:
- errormsg = (
- "[DUT: {}] OSPF summary {}:{} is %s, "
- "Expected is {}".format(
- router, summary, summ, show_ospf_json[summary][summ]
- )
- )
+ errormsg = ("[DUT: {}] OSPF summary {} : {} is {}, "
+ "Expected is {}".format(router, summary, summ,show_ospf_json[
+ summary][summ], summ_data[summ] ))
return errormsg
- logger.debug("Exiting API: verify_ospf_summary()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
-
@retry(retry_timeout=30)
def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
tag=None, metric=None, fib=None):
@@ -1627,31 +1715,34 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
found_routes = []
missing_routes = []
- if "static_routes" in input_dict[routerInput] or \
- "prefix" in input_dict[routerInput]:
+ if (
+ "static_routes" in input_dict[routerInput]
+ or "prefix" in input_dict[routerInput]
+ ):
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
-
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
- ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
+ ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Fix for PR 2644182
try:
- ospf_rib_json = ospf_rib_json['routes']
+ ospf_rib_json = ospf_rib_json["routes"]
except KeyError:
pass
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
- errormsg = "[DUT: {}] No routes found in OSPF6 route " \
+ errormsg = (
+ "[DUT: {}] No routes found in OSPF6 route "
"table".format(router)
+ )
return errormsg
network = static_route["network"]
@@ -1659,7 +1750,6 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
-
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
st_found = False
@@ -1668,7 +1758,7 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
- if _addr_type != 'ipv6':
+ if _addr_type != "ipv6":
continue
if st_rt in ospf_rib_json:
@@ -1681,17 +1771,26 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
- if 'fib' in ospf_rib_json[st_rt][
- mnh]["nextHops"][0]:
- found_hops.append([rib_r[
- "ip"] for rib_r in ospf_rib_json[
- st_rt][mnh]["nextHops"]])
+ if (
+ "fib"
+ in ospf_rib_json[st_rt][mnh]["nextHops"][0]
+ ):
+ found_hops.append(
+ [
+ rib_r["ip"]
+ for rib_r in ospf_rib_json[st_rt][mnh][
+ "nextHops"
+ ]
+ ]
+ )
if found_hops[0]:
- missing_list_of_nexthops = \
- set(found_hops[0]).difference(next_hop)
- additional_nexthops_in_required_nhs = \
- set(next_hop).difference(found_hops[0])
+ missing_list_of_nexthops = set(
+ found_hops[0]
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
@@ -1699,13 +1798,18 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut)
+ st_rt,
+ dut,
+ )
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
- additional_nexthops_in_required_nhs,
- st_rt, dut))
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
@@ -1713,98 +1817,118 @@ def verify_ospf6_rib(tgen, dut, input_dict, next_hop=None,
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
- found_hops = [rib_r['nextHop'] for rib_r in
- ospf_rib_json[st_rt][
- "nextHops"]]
+ found_hops = [
+ rib_r["nextHop"]
+ for rib_r in ospf_rib_json[st_rt]["nextHops"]
+ ]
if found_hops:
- missing_list_of_nexthops = \
- set(found_hops).difference(next_hop)
- additional_nexthops_in_required_nhs = \
- set(next_hop).difference(found_hops)
+ missing_list_of_nexthops = set(
+ found_hops
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
- "Missing nexthop %s for route"\
- " %s in RIB of router %s\n", \
- additional_nexthops_in_required_nhs, \
- st_rt, dut)
- errormsg=("Nexthop {} is Missing for "\
- "route {} in RIB of router {}\n".format(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut))
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
if _rtype:
- if "destinationType" not in ospf_rib_json[
- st_rt]:
- errormsg = ("[DUT: {}]: destinationType missing"
- "for route {} in OSPF RIB \n".\
- format(dut, st_rt))
+ if "destinationType" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: destinationType missing"
+ "for route {} in OSPF RIB \n".format(dut, st_rt)
+ )
return errormsg
- elif _rtype != ospf_rib_json[st_rt][
- "destinationType"]:
- errormsg = ("[DUT: {}]: destinationType mismatch"
- "for route {} in OSPF RIB \n".\
- format(dut, st_rt))
+ elif _rtype != ospf_rib_json[st_rt]["destinationType"]:
+ errormsg = (
+ "[DUT: {}]: destinationType mismatch"
+ "for route {} in OSPF RIB \n".format(dut, st_rt)
+ )
return errormsg
else:
- logger.info("DUT: {}]: Found destinationType {}"
- "for route {}".\
- format(dut, _rtype, st_rt))
+ logger.info(
+ "DUT: {}]: Found destinationType {}"
+ "for route {}".format(dut, _rtype, st_rt)
+ )
if tag:
- if "tag" not in ospf_rib_json[
- st_rt]:
- errormsg = ("[DUT: {}]: tag is not"
- " present for"
- " route {} in RIB \n".\
- format(dut, st_rt
- ))
+ if "tag" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
return errormsg
- if _tag != ospf_rib_json[
- st_rt]["tag"]:
- errormsg = ("[DUT: {}]: tag value {}"
- " is not matched for"
- " route {} in RIB \n".\
- format(dut, _tag, st_rt,
- ))
+ if _tag != ospf_rib_json[st_rt]["tag"]:
+ errormsg = (
+ "[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".format(
+ dut,
+ _tag,
+ st_rt,
+ )
+ )
return errormsg
if metric is not None:
- if "type2cost" not in ospf_rib_json[
- st_rt]:
- errormsg = ("[DUT: {}]: metric is"
- " not present for"
- " route {} in RIB \n".\
- format(dut, st_rt))
+ if "type2cost" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
return errormsg
- if metric != ospf_rib_json[
- st_rt]["type2cost"]:
- errormsg = ("[DUT: {}]: metric value "
- "{} is not matched for "
- "route {} in RIB \n".\
- format(dut, metric, st_rt,
- ))
+ if metric != ospf_rib_json[st_rt]["type2cost"]:
+ errormsg = (
+ "[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".format(
+ dut,
+ metric,
+ st_rt,
+ )
+ )
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
- logger.info("[DUT: {}]: Found next_hop {} for all OSPF"
- " routes in RIB".format(router, next_hop))
+ logger.info(
+ "[DUT: {}]: Found next_hop {} for all OSPF"
+ " routes in RIB".format(router, next_hop)
+ )
if len(missing_routes) > 0:
- errormsg = ("[DUT: {}]: Missing route in RIB, "
- "routes: {}".\
- format(dut, missing_routes))
+ errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
+ dut, missing_routes
+ )
return errormsg
if found_routes:
- logger.info("[DUT: %s]: Verified routes in RIB, found"
- " routes are: %s\n", dut, found_routes)
+ logger.info(
+ "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1855,15 +1979,16 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None):
result = False
for router, rnode in tgen.routers().iteritems():
- if 'ospf6' not in topo['routers'][router]:
+ if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
- show_ospf_json = run_frr_cmd(rnode, "show ipv6 ospf interface json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf interface json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -1873,32 +1998,49 @@ def verify_ospf6_interface(tgen, topo, dut=None,lan=False, input_dict=None):
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
- intf = topo['routers'][router]['links'][ospf_intf]['interface']
- if intf in show_ospf_json:
- for intf_attribute in intf_data['ospf6']:
- if intf_data['ospf6'][intf_attribute] is not list:
- if intf_data['ospf6'][intf_attribute] == show_ospf_json[
- intf][intf_attribute]:
- logger.info("[DUT: %s] OSPF6 interface %s: %s is %s",
- router, intf, intf_attribute, intf_data['ospf6'][
- intf_attribute])
- elif intf_data['ospf6'][intf_attribute] is list:
+ intf = topo["routers"][router]["links"][ospf_intf]["interface"]
+ if intf in show_ospf_json:
+ for intf_attribute in intf_data["ospf6"]:
+ if intf_data["ospf6"][intf_attribute] is not list:
+ if (
+ intf_data["ospf6"][intf_attribute]
+ == show_ospf_json[intf][intf_attribute]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF6 interface %s: %s is %s",
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf6"][intf_attribute],
+ )
+ elif intf_data["ospf6"][intf_attribute] is list:
for addr_list in len(show_ospf_json[intf][intf_attribute]):
- if show_ospf_json[intf][intf_attribute][addr_list][
- 'address'].split('/')[0] == intf_data['ospf6'][
- 'internetAddress'][0]['address']:
- break
+ if (
+ show_ospf_json[intf][intf_attribute][addr_list][
+ "address"
+ ].split("/")[0]
+ == intf_data["ospf6"]["internetAddress"][0]["address"]
+ ):
+ break
else:
- errormsg= "[DUT: {}] OSPF6 interface {}: {} is {}, \
- Expected is {}".format(router, intf, intf_attribute,
- intf_data['ospf6'][intf_attribute], intf_data['ospf6'][
- intf_attribute])
+ errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
+ Expected is {}".format(
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf6"][intf_attribute],
+ intf_data["ospf6"][intf_attribute],
+ )
return errormsg
else:
- errormsg= "[DUT: {}] OSPF6 interface {}: {} is {}, \
- Expected is {}".format(router, intf, intf_attribute,
- intf_data['ospf6'][intf_attribute], intf_data['ospf6'][
- intf_attribute])
+ errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
+ Expected is {}".format(
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf6"][intf_attribute],
+ intf_data["ospf6"][intf_attribute],
+ )
return errormsg
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@@ -1956,16 +2098,14 @@ def verify_ospf6_database(tgen, topo, dut, input_dict):
router = dut
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- dut)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
@@ -1973,167 +2113,209 @@ def verify_ospf6_database(tgen, topo, dut, input_dict):
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
- ospf_external_lsa = input_dict.setdefault(
- 'asExternalLinkStates', None)
+ ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
if ospf_db_data:
- for ospf_area, area_lsa in ospf_db_data.items():
- if ospf_area in show_ospf_json['areas']:
- if 'routerLinkStates' in area_lsa:
- for lsa in area_lsa['routerLinkStates']:
- for rtrlsa in show_ospf_json['areas'][ospf_area][
- 'routerLinkStates']:
- if lsa['lsaId'] == rtrlsa['lsaId'] and \
- lsa['advertisedRouter'] == rtrlsa[
- 'advertisedRouter']:
- result = True
- break
- if result:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Router "
- "LSA %s", router, ospf_area, lsa)
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if ospf_area in show_ospf_json["areas"]:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ for rtrlsa in show_ospf_json["areas"][ospf_area][
+ "routerLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == rtrlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == rtrlsa["advertisedRouter"]
+ ):
+ result = True
break
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}".format(router, ospf_area, lsa)
- return errormsg
+ )
+ return errormsg
- if 'networkLinkStates' in area_lsa:
- for lsa in area_lsa['networkLinkStates']:
- for netlsa in show_ospf_json['areas'][ospf_area][
- 'networkLinkStates']:
- if lsa in show_ospf_json['areas'][ospf_area][
- 'networkLinkStates']:
- if lsa['lsaId'] == netlsa['lsaId'] and \
- lsa['advertisedRouter'] == netlsa[
- 'advertisedRouter']:
- result = True
- break
- if result:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Network "
- "LSA %s", router, ospf_area, lsa)
- break
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ if "networkLinkStates" in area_lsa:
+ for lsa in area_lsa["networkLinkStates"]:
+ for netlsa in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]
+ ):
+ if (
+ lsa["lsaId"] == netlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == netlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
- return errormsg
+ )
+ return errormsg
- if 'summaryLinkStates' in area_lsa:
- for lsa in area_lsa['summaryLinkStates']:
- for t3lsa in show_ospf_json['areas'][ospf_area][
- 'summaryLinkStates']:
- if lsa['lsaId'] == t3lsa['lsaId'] and \
- lsa['advertisedRouter'] == t3lsa[
- 'advertisedRouter']:
- result = True
- break
- if result:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Summary "
- "LSA %s", router, ospf_area, lsa)
+ if "summaryLinkStates" in area_lsa:
+ for lsa in area_lsa["summaryLinkStates"]:
+ for t3lsa in show_ospf_json["areas"][ospf_area][
+ "summaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t3lsa["lsaId"]
+ and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
+ ):
+ result = True
break
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
- return errormsg
+ )
+ return errormsg
- if 'nssaExternalLinkStates' in area_lsa:
- for lsa in area_lsa['nssaExternalLinkStates']:
- for t7lsa in show_ospf_json['areas'][ospf_area][
- 'nssaExternalLinkStates']:
- if lsa['lsaId'] == t7lsa['lsaId'] and \
- lsa['advertisedRouter'] == t7lsa[
- 'advertisedRouter']:
- result = True
- break
- if result:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Type7 "
- "LSA %s", router, ospf_area, lsa)
+ if "nssaExternalLinkStates" in area_lsa:
+ for lsa in area_lsa["nssaExternalLinkStates"]:
+ for t7lsa in show_ospf_json["areas"][ospf_area][
+ "nssaExternalLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t7lsa["lsaId"]
+ and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
+ ):
+ result = True
break
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
- return errormsg
+ )
+ return errormsg
- if 'asbrSummaryLinkStates' in area_lsa:
- for lsa in area_lsa['asbrSummaryLinkStates']:
- for t4lsa in show_ospf_json['areas'][ospf_area][
- 'asbrSummaryLinkStates']:
- if lsa['lsaId'] == t4lsa['lsaId'] and \
- lsa['advertisedRouter'] == t4lsa[
- 'advertisedRouter']:
- result = True
- break
- if result:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:ASBR Summary "
- "LSA %s", router, ospf_area, lsa)
+ if "asbrSummaryLinkStates" in area_lsa:
+ for lsa in area_lsa["asbrSummaryLinkStates"]:
+ for t4lsa in show_ospf_json["areas"][ospf_area][
+ "asbrSummaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t4lsa["lsaId"]
+ and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
+ ):
result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
- " ASBR Summary LSA is {}".format(
- router, ospf_area, lsa)
- return errormsg
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
- if 'linkLocalOpaqueLsa' in area_lsa:
- for lsa in area_lsa['linkLocalOpaqueLsa']:
- try:
- for lnklsa in show_ospf_json['areas'][ospf_area][
- 'linkLocalOpaqueLsa']:
- if lsa['lsaId'] in lnklsa['lsaId'] and \
- 'linkLocalOpaqueLsa' in show_ospf_json[
- 'areas'][ospf_area]:
- logger.info((
- "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
- "%s", ospf_area, lsa))
- result = True
- else:
- errormsg = ("[DUT: FRR] OSPF LSDB area: {} "
- "expected Opaque-LSA is {}, Found is {}".format(
- ospf_area, lsa, show_ospf_json))
- raise ValueError (errormsg)
- return errormsg
- except KeyError:
- errormsg = ("[DUT: FRR] linkLocalOpaqueLsa Not "
- "present")
- return errormsg
+ if "linkLocalOpaqueLsa" in area_lsa:
+ for lsa in area_lsa["linkLocalOpaqueLsa"]:
+ try:
+ for lnklsa in show_ospf_json["areas"][ospf_area][
+ "linkLocalOpaqueLsa"
+ ]:
+ if (
+ lsa["lsaId"] in lnklsa["lsaId"]
+ and "linkLocalOpaqueLsa"
+ in show_ospf_json["areas"][ospf_area]
+ ):
+ logger.info(
+ (
+ "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
+ "%s",
+ ospf_area,
+ lsa,
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF LSDB area: {} "
+ "expected Opaque-LSA is {}, Found is {}".format(
+ ospf_area, lsa, show_ospf_json
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+ except KeyError:
+ errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
+ return errormsg
if ospf_external_lsa:
- for lsa in ospf_external_lsa:
- try:
- for t5lsa in show_ospf_json['asExternalLinkStates']:
- if lsa['lsaId'] == t5lsa['lsaId'] and \
- lsa['advertisedRouter'] == t5lsa[
- 'advertisedRouter']:
- result = True
- break
- except KeyError:
- result = False
- if result:
- logger.info(
- "[DUT: %s] OSPF LSDB:External LSA %s",
- router, lsa)
- result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB : expected" \
- " External LSA is {}".format(router, lsa)
- return errormsg
+ for lsa in ospf_external_lsa:
+ try:
+ for t5lsa in show_ospf_json["asExternalLinkStates"]:
+ if (
+ lsa["lsaId"] == t5lsa["lsaId"]
+ and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ except KeyError:
+ result = False
+ if result:
+ logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB : expected"
+ " External LSA is {}".format(router, lsa)
+ )
+ return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
-
-def config_ospf6_interface (tgen, topo, input_dict=None, build=False,
- load_config=True):
+def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
@@ -2172,6 +2354,9 @@ def config_ospf6_interface (tgen, topo, input_dict=None, build=False,
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]['links'].keys():
@@ -2180,17 +2365,17 @@ def config_ospf6_interface (tgen, topo, input_dict=None, build=False,
"input_dict, passed input_dict %s", router,
str(input_dict))
continue
- ospf_data = input_dict[router]['links'][lnk]['ospf6']
+ ospf_data = input_dict[router]["links"][lnk]["ospf6"]
data_ospf_area = ospf_data.setdefault("area", None)
- data_ospf_auth = ospf_data.setdefault("authentication", None)
+ data_ospf_auth = ospf_data.setdefault("hash-algo", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
try:
- intf = topo['routers'][router]['links'][lnk]['interface']
+ intf = topo["routers"][router]["links"][lnk]["interface"]
except KeyError:
- intf = topo['switches'][router]['links'][lnk]['interface']
+ intf = topo["switches"][router]["links"][lnk]["interface"]
# interface
cmd = "interface {}".format(intf)
@@ -2201,34 +2386,117 @@ def config_ospf6_interface (tgen, topo, input_dict=None, build=False,
cmd = "ipv6 ospf area {}".format(data_ospf_area)
config_data.append(cmd)
+ # interface ospf auth
+ if data_ospf_auth:
+ cmd = "ipv6 ospf6 authentication"
+
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+
+ if "hash-algo" in ospf_data:
+ cmd = "{} key-id {} hash-algo {} key {}".format(
+ cmd,
+ ospf_data["key-id"],
+ ospf_data["hash-algo"],
+ ospf_data["key"],
+ )
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
# interface ospf dr priority
if data_ospf_dr_priority:
- cmd = "ipv6 ospf priority {}".format(
- ospf_data["priority"])
- if 'del_action' in ospf_data:
+ cmd = "ipv6 ospf priority {}".format(ospf_data["priority"])
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost:
- cmd = "ipv6 ospf cost {}".format(
- ospf_data["cost"])
- if 'del_action' in ospf_data:
+ cmd = "ipv6 ospf cost {}".format(ospf_data["cost"])
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf mtu
if data_ospf_mtu:
cmd = "ipv6 ospf mtu-ignore"
- if 'del_action' in ospf_data:
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+@retry(retry_timeout=20)
+def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
+ """
+ This API is used to vreify gr helper using command
+ show ip ospf graceful-restart helper
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * 'dut' : router
+ * 'input_dict' - values to be verified
+
+ Usage:
+ -------
+ input_dict = {
+ "helperSupport":"Disabled",
+ "strictLsaCheck":"Enabled",
+ "restartSupoort":"Planned and Unplanned Restarts",
+ "supportedGracePeriod":1800
+ }
+ result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
+
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ if 'ospf' not in topo['routers'][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
+ dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+ logger.info("Verifying OSPF GR details on router %s:", dut)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json",
+ isjson=True)
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ raise ValueError (errormsg)
+ return errormsg
+
+ for ospf_gr, gr_data in input_dict.items():
+ try:
+ if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
+ logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr,
+ show_ospf_json[ospf_gr])
+ result = True
else:
- result = create_common_configuration(tgen, router, config_data,
- "interface_config",
- build=build)
+ errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
+ "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[
+ ospf_gr]))
+ raise ValueError (errormsg)
+ return errormsg
+
+ except KeyError:
+ errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr))
+ return errormsg
+
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result