summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py689
1 files changed, 380 insertions, 309 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 9d6b8fa691..9f3d4841b0 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -26,12 +26,15 @@ import ipaddr
from lib.topotest import frr_unicode
# Import common_config to use commomnly used APIs
-from lib.common_config import (create_common_configuration,
- InvalidCLIError, retry,
- generate_ips,
- check_address_types,
- validate_ip_address,
- run_frr_cmd)
+from lib.common_config import (
+ create_common_configuration,
+ InvalidCLIError,
+ retry,
+ generate_ips,
+ check_address_types,
+ validate_ip_address,
+ run_frr_cmd,
+)
LOGDIR = "/tmp/topotests/"
TMPDIR = None
@@ -40,9 +43,8 @@ TMPDIR = None
# Configure procs
################################
-def create_router_ospf(
- tgen, topo, input_dict=None, build=False,
- load_config=True):
+
+def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
@@ -84,19 +86,15 @@ def create_router_ospf(
logger.debug("Router %s: 'ospf' not present in input_dict", router)
continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config)
+ result = __create_ospf_global(tgen, input_dict, router, build, load_config)
if result is True:
ospf_data = input_dict[router]["ospf"]
-
logger.debug("Exiting lib API: create_router_ospf()")
return result
-def __create_ospf_global(
- tgen, input_dict, router, build=False,
- load_config=True):
+def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True):
"""
Helper API to create ospf global configuration.
@@ -121,9 +119,9 @@ def __create_ospf_global(
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
config_data = ["no router ospf"]
- result = create_common_configuration(tgen, router, config_data,
- "ospf", build,
- load_config)
+ result = create_common_configuration(
+ tgen, router, config_data, "ospf", build, load_config
+ )
return result
config_data = []
@@ -137,34 +135,33 @@ def __create_ospf_global(
if del_router_id:
config_data.append("no ospf router-id")
if router_id:
- config_data.append("ospf router-id {}".format(
- router_id))
+ config_data.append("ospf router-id {}".format(router_id))
# redistribute command
redistribute_data = ospf_data.setdefault("redistribute", {})
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
- logger.debug("Router %s: 'redist_type' not present in "
- "input_dict", router)
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
else:
- cmd = "redistribute {}".format(
- redistribute["redist_type"])
+ cmd = "redistribute {}".format(redistribute["redist_type"])
for red_type in redistribute_data:
if "route_map" in red_type:
- cmd = cmd + " route-map {}".format(red_type[
- 'route_map'])
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- #area information
+ # area information
area_data = ospf_data.setdefault("area", {})
if area_data:
for area in area_data:
if "id" not in area:
- logger.debug("Router %s: 'area id' not present in "
- "input_dict", router)
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
else:
cmd = "area {}".format(area["id"])
@@ -175,19 +172,21 @@ def __create_ospf_global(
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(tgen, router, config_data,
- "ospf", build, load_config)
+ result = create_common_configuration(
+ tgen, router, config_data, "ospf", build, load_config
+ )
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
if summary_data:
for summary in summary_data:
if "prefix" not in summary:
- logger.debug("Router %s: 'summary-address' not present in "
- "input_dict", router)
+ logger.debug(
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
+ )
else:
- cmd = "summary {}/{}".format(summary["prefix"], summary[
- "mask"])
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
_tag = summary.setdefault("tag", None)
if _tag:
@@ -201,8 +200,9 @@ def __create_ospf_global(
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(tgen, router, config_data,
- "ospf", build, load_config)
+ result = create_common_configuration(
+ tgen, router, config_data, "ospf", build, load_config
+ )
except InvalidCLIError:
# Traceback
@@ -214,9 +214,7 @@ def __create_ospf_global(
return result
-def create_router_ospf6(
- tgen, topo, input_dict=None, build=False,
- load_config=True):
+def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router
@@ -253,16 +251,13 @@ def create_router_ospf6(
logger.debug("Router %s: 'ospf' not present in input_dict", router)
continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config)
+ result = __create_ospf_global(tgen, input_dict, router, build, load_config)
logger.debug("Exiting lib API: create_router_ospf()")
return result
-def __create_ospf6_global(
- tgen, input_dict, router, build=False,
- load_config=True):
+def __create_ospf6_global(tgen, input_dict, router, build=False, load_config=True):
"""
Helper API to create ospf global configuration.
@@ -286,9 +281,9 @@ def __create_ospf6_global(
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
config_data = ["no ipv6 router ospf"]
- result = create_common_configuration(tgen, router, config_data,
- "ospf", build,
- load_config)
+ result = create_common_configuration(
+ tgen, router, config_data, "ospf", build, load_config
+ )
return result
config_data = []
@@ -301,11 +296,11 @@ def __create_ospf6_global(
if del_router_id:
config_data.append("no ospf router-id")
if router_id:
- config_data.append("ospf router-id {}".format(
- router_id))
+ config_data.append("ospf router-id {}".format(router_id))
- result = create_common_configuration(tgen, router, config_data,
- "ospf", build, load_config)
+ result = create_common_configuration(
+ tgen, router, config_data, "ospf", build, load_config
+ )
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
@@ -315,8 +310,8 @@ def __create_ospf6_global(
logger.debug("Exiting lib API: create_ospf_global()")
return result
-def config_ospf_interface (tgen, topo, input_dict=None, build=False,
- load_config=True):
+
+def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
@@ -356,22 +351,25 @@ def config_ospf_interface (tgen, topo, input_dict=None, build=False,
input_dict = deepcopy(input_dict)
for router in input_dict.keys():
config_data = []
- for lnk in input_dict[router]['links'].keys():
- if "ospf" not in input_dict[router]['links'][lnk]:
- logger.debug("Router %s: ospf configs is not present in"
- "input_dict, passed input_dict", router,
- input_dict)
+ for lnk in input_dict[router]["links"].keys():
+ if "ospf" not in input_dict[router]["links"][lnk]:
+ logger.debug(
+ "Router %s: ospf configs is not present in"
+ "input_dict, passed input_dict",
+ router,
+ input_dict,
+ )
continue
- ospf_data = input_dict[router]['links'][lnk]['ospf']
+ ospf_data = input_dict[router]["links"][lnk]["ospf"]
data_ospf_area = ospf_data.setdefault("area", None)
data_ospf_auth = ospf_data.setdefault("authentication", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
try:
- intf = topo['routers'][router]['links'][lnk]['interface']
+ intf = topo["routers"][router]["links"][lnk]["interface"]
except KeyError:
- intf = topo['switches'][router]['links'][lnk]['interface']
+ intf = topo["switches"][router]["links"][lnk]["interface"]
# interface
cmd = "interface {}".format(intf)
@@ -383,58 +381,60 @@ def config_ospf_interface (tgen, topo, input_dict=None, build=False,
config_data.append(cmd)
# interface ospf auth
if data_ospf_auth:
- if data_ospf_auth == 'null':
+ if data_ospf_auth == "null":
cmd = "ip ospf authentication null"
- elif data_ospf_auth == 'message-digest':
+ elif data_ospf_auth == "message-digest":
cmd = "ip ospf authentication message-digest"
else:
cmd = "ip ospf authentication"
- if 'del_action' in ospf_data:
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "message-digest-key" in ospf_data:
cmd = "ip ospf message-digest-key {} md5 {}".format(
- ospf_data["message-digest-key"],ospf_data[
- "authentication-key"])
- if 'del_action' in ospf_data:
+ ospf_data["message-digest-key"], ospf_data["authentication-key"]
+ )
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- if "authentication-key" in ospf_data and \
- "message-digest-key" not in ospf_data:
- cmd = "ip ospf authentication-key {}".format(ospf_data[
- "authentication-key"])
- if 'del_action' in ospf_data:
+ if (
+ "authentication-key" in ospf_data
+ and "message-digest-key" not in ospf_data
+ ):
+ cmd = "ip ospf authentication-key {}".format(
+ ospf_data["authentication-key"]
+ )
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf dr priority
if data_ospf_dr_priority in ospf_data:
- cmd = "ip ospf priority {}".format(
- ospf_data["priority"])
- if 'del_action' in ospf_data:
+ cmd = "ip ospf priority {}".format(ospf_data["priority"])
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost in ospf_data:
- cmd = "ip ospf cost {}".format(
- ospf_data["cost"])
- if 'del_action' in ospf_data:
+ cmd = "ip ospf cost {}".format(ospf_data["cost"])
+ if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
else:
- result = create_common_configuration(tgen, router, config_data,
- "interface_config",
- build=build)
+ result = create_common_configuration(
+ tgen, router, config_data, "interface_config", build=build
+ )
logger.debug("Exiting lib API: create_igmp_config()")
return result
+
def clear_ospf(tgen, router):
"""
This API is to clear ospf neighborship by running
@@ -517,15 +517,16 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
result = False
if input_dict:
for router, rnode in tgen.routers().items():
- if 'ospf' not in topo['routers'][router]:
+ if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
- show_ospf_json = run_frr_cmd(rnode,
- "show ip ospf neighbor all json", isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf neighbor all json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -533,126 +534,134 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
return errormsg
ospf_data_list = input_dict[router]["ospf"]
- ospf_nbr_list = ospf_data_list['neighbors']
+ ospf_nbr_list = ospf_data_list["neighbors"]
for ospf_nbr, nbr_data in ospf_nbr_list.items():
- data_ip = topo['routers'][ospf_nbr]['links']
- data_rid = topo['routers'][ospf_nbr]['ospf']['router_id']
+ data_ip = topo["routers"][ospf_nbr]["links"]
+ data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
- for switch in topo['switches']:
- if 'ospf' in topo['switches'][switch]['links'][router]:
- neighbor_ip = data_ip[switch]['ipv4'].split("/")[0]
+ for switch in topo["switches"]:
+ if "ospf" in topo["switches"][switch]["links"][router]:
+ neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
else:
continue
else:
- neighbor_ip = data_ip[router]['ipv4'].split("/")[0]
+ neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
- nh_state = show_ospf_json[nbr_rid][0][
- 'state'].split('/')[0]
- intf_state = show_ospf_json[nbr_rid][0][
- 'state'].split('/')[1]
+ nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0]
+ intf_state = show_ospf_json[nbr_rid][0]["state"].split("/")[1]
except KeyError:
- errormsg = "[DUT: {}] OSPF peer {} missing".format(router,
- nbr_rid)
+ errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid)
return errormsg
- nbr_state = nbr_data.setdefault("state",None)
- nbr_role = nbr_data.setdefault("role",None)
+ nbr_state = nbr_data.setdefault("state", None)
+ nbr_role = nbr_data.setdefault("role", None)
if nbr_state:
if nbr_state == nh_state:
- logger.info("[DUT: {}] OSPF Nbr is {}:{} State {}".format
- (router, ospf_nbr, nbr_rid, nh_state))
+ logger.info(
+ "[DUT: {}] OSPF Nbr is {}:{} State {}".format(
+ router, ospf_nbr, nbr_rid, nh_state
+ )
+ )
result = True
else:
- errormsg = ("[DUT: {}] OSPF is not Converged, neighbor"
- " state is {}".format(router, nh_state))
+ errormsg = (
+ "[DUT: {}] OSPF is not Converged, neighbor"
+ " state is {}".format(router, nh_state)
+ )
return errormsg
if nbr_role:
if nbr_role == intf_state:
- logger.info("[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
- router, ospf_nbr, nbr_rid, nbr_role))
+ logger.info(
+ "[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
+ router, ospf_nbr, nbr_rid, nbr_role
+ )
+ )
else:
- errormsg = ("[DUT: {}] OSPF is not Converged with rid"
- "{}, role is {}".format(router, nbr_rid, intf_state))
+ errormsg = (
+ "[DUT: {}] OSPF is not Converged with rid"
+ "{}, role is {}".format(router, nbr_rid, intf_state)
+ )
return errormsg
continue
else:
for router, rnode in tgen.routers().items():
- if 'ospf' not in topo['routers'][router]:
+ if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
- show_ospf_json = run_frr_cmd(rnode,
- "show ip ospf neighbor all json", isjson=True)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf neighbor all json", isjson=True
+ )
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf"]
- ospf_neighbors = ospf_data_list['neighbors']
+ ospf_neighbors = ospf_data_list["neighbors"]
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
- ospf_nbr_list = ospf_data_list['neighbors']
+ ospf_nbr_list = ospf_data_list["neighbors"]
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
if nbr_data:
- data_ip = topo['routers'][nbr_data["nbr"]]['links']
- data_rid = topo['routers'][nbr_data["nbr"]][
- 'ospf']['router_id']
+ data_ip = topo["routers"][nbr_data["nbr"]]["links"]
+ data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"]
else:
- data_ip = topo['routers'][ospf_nbr]['links']
- data_rid = topo['routers'][ospf_nbr]['ospf']['router_id']
+ data_ip = topo["routers"][ospf_nbr]["links"]
+ data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
- for switch in topo['switches']:
- if 'ospf' in topo['switches'][switch]['links'][router]:
- neighbor_ip = data_ip[switch]['ipv4'].split("/")[0]
+ for switch in topo["switches"]:
+ if "ospf" in topo["switches"][switch]["links"][router]:
+ neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
else:
continue
else:
- neighbor_ip = data_ip[router]['ipv4'].split("/")[0]
+ neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
- nh_state = show_ospf_json[nbr_rid][0][
- 'state'].split('/')[0]
+ nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0]
except KeyError:
- errormsg = "[DUT: {}] OSPF peer {} missing,from "\
- "{} ".format(router,
- nbr_rid, ospf_nbr)
+ errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
+ router, nbr_rid, ospf_nbr
+ )
return errormsg
- if nh_state == 'Full':
+ if nh_state == "Full":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF is Converged".format(router))
result = True
else:
- errormsg = ("[DUT: {}] OSPF is not Converged".format(router))
+ errormsg = "[DUT: {}] OSPF is not Converged".format(router)
return errormsg
logger.debug("Exiting API: verify_ospf_neighbor()")
return result
+
@retry(attempts=21, wait=2, return_is_str=True)
-def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
- tag=None, metric=None, fib=None):
+def verify_ospf_rib(
+ tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
+):
"""
This API is to verify ospf routes by running
show ip ospf route command.
@@ -706,25 +715,28 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
found_routes = []
missing_routes = []
- if "static_routes" in input_dict[routerInput] or \
- "prefix" in input_dict[routerInput]:
+ if (
+ "static_routes" in input_dict[routerInput]
+ or "prefix" in input_dict[routerInput]
+ ):
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
-
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
- ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
+ ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
- errormsg = "[DUT: {}] No routes found in OSPF route " \
+ errormsg = (
+ "[DUT: {}] No routes found in OSPF route "
"table".format(router)
+ )
return errormsg
network = static_route["network"]
@@ -732,7 +744,6 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
-
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
st_found = False
@@ -742,7 +753,7 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
- if _addr_type != 'ipv4':
+ if _addr_type != "ipv4":
continue
if st_rt in ospf_rib_json:
@@ -754,17 +765,26 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
- if 'fib' in ospf_rib_json[st_rt][
- mnh]["nexthops"][0]:
- found_hops.append([rib_r[
- "ip"] for rib_r in ospf_rib_json[
- st_rt][mnh]["nexthops"]])
+ if (
+ "fib"
+ in ospf_rib_json[st_rt][mnh]["nexthops"][0]
+ ):
+ found_hops.append(
+ [
+ rib_r["ip"]
+ for rib_r in ospf_rib_json[st_rt][mnh][
+ "nexthops"
+ ]
+ ]
+ )
if found_hops[0]:
- missing_list_of_nexthops = \
- set(found_hops[0]).difference(next_hop)
- additional_nexthops_in_required_nhs = \
- set(next_hop).difference(found_hops[0])
+ missing_list_of_nexthops = set(
+ found_hops[0]
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
@@ -772,13 +792,18 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut)
+ st_rt,
+ dut,
+ )
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
- additional_nexthops_in_required_nhs,
- st_rt, dut))
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
@@ -786,99 +811,111 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
- found_hops = [rib_r["ip"] for rib_r in
- ospf_rib_json[st_rt][
- "nexthops"]]
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in ospf_rib_json[st_rt]["nexthops"]
+ ]
if found_hops:
- missing_list_of_nexthops = \
- set(found_hops).difference(next_hop)
- additional_nexthops_in_required_nhs = \
- set(next_hop).difference(found_hops)
+ missing_list_of_nexthops = set(
+ found_hops
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
- "Missing nexthop %s for route"\
- " %s in RIB of router %s\n", \
- additional_nexthops_in_required_nhs, \
- st_rt, dut)
- errormsg=("Nexthop {} is Missing for "\
- "route {} in RIB of router {}\n".format(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut))
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
if _rtype:
- if "routeType" not in ospf_rib_json[
- st_rt]:
- errormsg = ("[DUT: {}]: routeType missing"
- "for route {} in OSPF RIB \n".\
- format(dut, st_rt))
+ if "routeType" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: routeType missing"
+ "for route {} in OSPF RIB \n".format(dut, st_rt)
+ )
return errormsg
- elif _rtype != ospf_rib_json[st_rt][
- "routeType"]:
- errormsg = ("[DUT: {}]: routeType mismatch"
- "for route {} in OSPF RIB \n".\
- format(dut, st_rt))
+ elif _rtype != ospf_rib_json[st_rt]["routeType"]:
+ errormsg = (
+ "[DUT: {}]: routeType mismatch"
+ "for route {} in OSPF RIB \n".format(dut, st_rt)
+ )
return errormsg
else:
- logger.info("DUT: {}]: Found routeType {}"
- "for route {}".\
- format(dut, _rtype, st_rt))
+ logger.info(
+ "DUT: {}]: Found routeType {}"
+ "for route {}".format(dut, _rtype, st_rt)
+ )
if tag:
- if "tag" not in ospf_rib_json[
- st_rt]:
- errormsg = ("[DUT: {}]: tag is not"
- " present for"
- " route {} in RIB \n".\
- format(dut, st_rt
- ))
+ if "tag" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
return errormsg
- if _tag != ospf_rib_json[
- st_rt]["tag"]:
- errormsg = ("[DUT: {}]: tag value {}"
- " is not matched for"
- " route {} in RIB \n".\
- format(dut, _tag, st_rt,
- ))
+ if _tag != ospf_rib_json[st_rt]["tag"]:
+ errormsg = (
+ "[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".format(dut, _tag, st_rt,)
+ )
return errormsg
if metric is not None:
- if "type2cost" not in ospf_rib_json[
- st_rt]:
- errormsg = ("[DUT: {}]: metric is"
- " not present for"
- " route {} in RIB \n".\
- format(dut, st_rt))
+ if "type2cost" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
return errormsg
- if metric != ospf_rib_json[
- st_rt]["type2cost"]:
- errormsg = ("[DUT: {}]: metric value "
- "{} is not matched for "
- "route {} in RIB \n".\
- format(dut, metric, st_rt,
- ))
+ if metric != ospf_rib_json[st_rt]["type2cost"]:
+ errormsg = (
+ "[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".format(dut, metric, st_rt,)
+ )
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
- logger.info("[DUT: {}]: Found next_hop {} for all OSPF"
- " routes in RIB".format(router, next_hop))
+ logger.info(
+ "[DUT: {}]: Found next_hop {} for all OSPF"
+ " routes in RIB".format(router, next_hop)
+ )
if len(missing_routes) > 0:
- errormsg = ("[DUT: {}]: Missing route in RIB, "
- "routes: {}".\
- format(dut, missing_routes))
+ errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
+ dut, missing_routes
+ )
return errormsg
if found_routes:
- logger.info("[DUT: %s]: Verified routes in RIB, found"
- " routes are: %s\n", dut, found_routes)
+ logger.info(
+ "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
result = True
logger.info("Exiting lib API: verify_ospf_rib()")
@@ -886,7 +923,7 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
@retry(attempts=10, wait=2, return_is_str=True)
-def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None):
+def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
@@ -928,15 +965,14 @@ def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None):
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
for router, rnode in tgen.routers().items():
- if 'ospf' not in topo['routers'][router]:
+ if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -946,19 +982,29 @@ def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None):
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
- intf = topo['routers'][router]['links'][ospf_intf]['interface']
- if intf in show_ospf_json['interfaces']:
- for intf_attribute in intf_data['ospf']:
- if intf_data['ospf'][intf_attribute] == show_ospf_json[
- 'interfaces'][intf][intf_attribute]:
- logger.info("[DUT: %s] OSPF interface %s: %s is %s",
- router, intf, intf_attribute, intf_data['ospf'][
- intf_attribute])
+ intf = topo["routers"][router]["links"][ospf_intf]["interface"]
+ if intf in show_ospf_json["interfaces"]:
+ for intf_attribute in intf_data["ospf"]:
+ if (
+ intf_data["ospf"][intf_attribute]
+ == show_ospf_json["interfaces"][intf][intf_attribute]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF interface %s: %s is %s",
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf"][intf_attribute],
+ )
else:
- errormsg= "[DUT: {}] OSPF interface {}: {} is {}, \
- Expected is {}".format(router, intf, intf_attribute,
- intf_data['ospf'][intf_attribute], show_ospf_json[
- 'interfaces'][intf][intf_attribute])
+ errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \
+ Expected is {}".format(
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf"][intf_attribute],
+ show_ospf_json["interfaces"][intf][intf_attribute],
+ )
return errormsg
result = True
logger.debug("Exiting API: verify_ospf_interface()")
@@ -1016,16 +1062,14 @@ def verify_ospf_database(tgen, topo, dut, input_dict):
router = dut
logger.debug("Entering lib API: verify_ospf_database()")
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- dut)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
@@ -1033,82 +1077,103 @@ def verify_ospf_database(tgen, topo, dut, input_dict):
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
- ospf_external_lsa = input_dict.setdefault(
- 'AS External Link States', None)
+ ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
if ospf_db_data:
- for ospf_area, area_lsa in ospf_db_data.items():
- if ospf_area in show_ospf_json['areas']:
- if 'Router Link States' in area_lsa:
- for lsa in area_lsa['Router Link States']:
- if lsa in show_ospf_json['areas'][ospf_area][
- 'Router Link States']:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Router "
- "LSA %s", router, ospf_area, lsa)
- result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if ospf_area in show_ospf_json["areas"]:
+ if "Router Link States" in area_lsa:
+ for lsa in area_lsa["Router Link States"]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area]["Router Link States"]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}".format(router, ospf_area, lsa)
- return errormsg
- if 'Net Link States' in area_lsa:
- for lsa in area_lsa['Net Link States']:
- if lsa in show_ospf_json['areas'][ospf_area][
- 'Net Link States']:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Network "
- "LSA %s", router, ospf_area, lsa)
- result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ )
+ return errormsg
+ if "Net Link States" in area_lsa:
+ for lsa in area_lsa["Net Link States"]:
+ if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
- return errormsg
- if 'Summary Link States' in area_lsa:
- for lsa in area_lsa['Summary Link States']:
- if lsa in show_ospf_json['areas'][ospf_area][
- 'Summary Link States']:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Summary "
- "LSA %s", router, ospf_area, lsa)
- result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
+ )
+ return errormsg
+ if "Summary Link States" in area_lsa:
+ for lsa in area_lsa["Summary Link States"]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area]["Summary Link States"]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
- return errormsg
- if 'ASBR-Summary Link States' in area_lsa:
- for lsa in area_lsa['ASBR-Summary Link States']:
- if lsa in show_ospf_json['areas'][ospf_area][
- 'ASBR-Summary Link States']:
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:ASBR Summary "
- "LSA %s", router, ospf_area, lsa)
- result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB area {}: expected" \
- " ASBR Summary LSA is {}".format(
- router, ospf_area, lsa)
- return errormsg
+ )
+ return errormsg
+ if "ASBR-Summary Link States" in area_lsa:
+ for lsa in area_lsa["ASBR-Summary Link States"]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "ASBR-Summary Link States"
+ ]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
if ospf_external_lsa:
- for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
- if ospf_ext_lsa in show_ospf_json['AS External Link States']:
- logger.info(
- "[DUT: %s] OSPF LSDB:External LSA %s",
- router, ospf_ext_lsa)
- result = True
- else:
- errormsg = \
- "[DUT: {}] OSPF LSDB : expected" \
- " External LSA is {}".format(router, ospf_ext_lsa)
- return errormsg
+ for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
+ if ospf_ext_lsa in show_ospf_json["AS External Link States"]:
+ logger.info(
+ "[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB : expected"
+ " External LSA is {}".format(router, ospf_ext_lsa)
+ )
+ return errormsg
logger.debug("Exiting API: verify_ospf_database()")
return result
-
@retry(attempts=10, wait=2, return_is_str=True)
def verify_ospf_summary(tgen, topo, dut, input_dict):
"""
@@ -1146,14 +1211,12 @@ def verify_ospf_summary(tgen, topo, dut, input_dict):
logger.info("Verifying OSPF summary on router %s:", router)
- if 'ospf' not in topo['routers'][dut]:
- errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
- router)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
return errormsg
rnode = tgen.routers()[dut]
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
- isjson=True)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
@@ -1165,17 +1228,25 @@ def verify_ospf_summary(tgen, topo, dut, input_dict):
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
- summary = ospf_summary_data[ospf_summ]['Summary address']
+ summary = ospf_summary_data[ospf_summ]["Summary address"]
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
- logger.info("[DUT: %s] OSPF summary %s:%s is %s",
- router, summary, summ, summ_data[summ])
+ logger.info(
+ "[DUT: %s] OSPF summary %s:%s is %s",
+ router,
+ summary,
+ summ,
+ summ_data[summ],
+ )
result = True
else:
- errormsg = ("[DUT: {}] OSPF summary {}:{} is %s, "
- "Expected is {}".format(router,summary, summ,
- show_ospf_json[summary][summ]))
+ errormsg = (
+ "[DUT: {}] OSPF summary {}:{} is %s, "
+ "Expected is {}".format(
+ router, summary, summ, show_ospf_json[summary][summ]
+ )
+ )
return errormsg
logger.debug("Exiting API: verify_ospf_summary()")