diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 689 |
1 files changed, 380 insertions, 309 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 9d6b8fa691..9f3d4841b0 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -26,12 +26,15 @@ import ipaddr from lib.topotest import frr_unicode # Import common_config to use commomnly used APIs -from lib.common_config import (create_common_configuration, - InvalidCLIError, retry, - generate_ips, - check_address_types, - validate_ip_address, - run_frr_cmd) +from lib.common_config import ( + create_common_configuration, + InvalidCLIError, + retry, + generate_ips, + check_address_types, + validate_ip_address, + run_frr_cmd, +) LOGDIR = "/tmp/topotests/" TMPDIR = None @@ -40,9 +43,8 @@ TMPDIR = None # Configure procs ################################ -def create_router_ospf( - tgen, topo, input_dict=None, build=False, - load_config=True): + +def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. @@ -84,19 +86,15 @@ def create_router_ospf( logger.debug("Router %s: 'ospf' not present in input_dict", router) continue - result = __create_ospf_global( - tgen, input_dict, router, build, load_config) + result = __create_ospf_global(tgen, input_dict, router, build, load_config) if result is True: ospf_data = input_dict[router]["ospf"] - logger.debug("Exiting lib API: create_router_ospf()") return result -def __create_ospf_global( - tgen, input_dict, router, build=False, - load_config=True): +def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True): """ Helper API to create ospf global configuration. @@ -121,9 +119,9 @@ def __create_ospf_global( del_ospf_action = ospf_data.setdefault("delete", False) if del_ospf_action: config_data = ["no router ospf"] - result = create_common_configuration(tgen, router, config_data, - "ospf", build, - load_config) + result = create_common_configuration( + tgen, router, config_data, "ospf", build, load_config + ) return result config_data = [] @@ -137,34 +135,33 @@ def __create_ospf_global( if del_router_id: config_data.append("no ospf router-id") if router_id: - config_data.append("ospf router-id {}".format( - router_id)) + config_data.append("ospf router-id {}".format(router_id)) # redistribute command redistribute_data = ospf_data.setdefault("redistribute", {}) if redistribute_data: for redistribute in redistribute_data: if "redist_type" not in redistribute: - logger.debug("Router %s: 'redist_type' not present in " - "input_dict", router) + logger.debug( + "Router %s: 'redist_type' not present in " "input_dict", router + ) else: - cmd = "redistribute {}".format( - redistribute["redist_type"]) + cmd = "redistribute {}".format(redistribute["redist_type"]) for red_type in redistribute_data: if "route_map" in red_type: - cmd = cmd + " route-map {}".format(red_type[ - 'route_map']) + cmd = cmd + " route-map {}".format(red_type["route_map"]) del_action = redistribute.setdefault("delete", False) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) - #area information + # area information area_data = ospf_data.setdefault("area", {}) if area_data: for area in area_data: if "id" not in area: - logger.debug("Router %s: 'area id' not present in " - "input_dict", router) + logger.debug( + "Router %s: 'area id' not present in " "input_dict", router + ) else: cmd = "area {}".format(area["id"]) @@ -175,19 +172,21 @@ def __create_ospf_global( if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) - result = create_common_configuration(tgen, router, config_data, - "ospf", build, load_config) + result = create_common_configuration( + tgen, router, config_data, "ospf", build, load_config + ) # summary information summary_data = ospf_data.setdefault("summary-address", {}) if summary_data: for summary in summary_data: if "prefix" not in summary: - logger.debug("Router %s: 'summary-address' not present in " - "input_dict", router) + logger.debug( + "Router %s: 'summary-address' not present in " "input_dict", + router, + ) else: - cmd = "summary {}/{}".format(summary["prefix"], summary[ - "mask"]) + cmd = "summary {}/{}".format(summary["prefix"], summary["mask"]) _tag = summary.setdefault("tag", None) if _tag: @@ -201,8 +200,9 @@ def __create_ospf_global( if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) - result = create_common_configuration(tgen, router, config_data, - "ospf", build, load_config) + result = create_common_configuration( + tgen, router, config_data, "ospf", build, load_config + ) except InvalidCLIError: # Traceback @@ -214,9 +214,7 @@ def __create_ospf_global( return result -def create_router_ospf6( - tgen, topo, input_dict=None, build=False, - load_config=True): +def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router @@ -253,16 +251,13 @@ def create_router_ospf6( logger.debug("Router %s: 'ospf' not present in input_dict", router) continue - result = __create_ospf_global( - tgen, input_dict, router, build, load_config) + result = __create_ospf_global(tgen, input_dict, router, build, load_config) logger.debug("Exiting lib API: create_router_ospf()") return result -def __create_ospf6_global( - tgen, input_dict, router, build=False, - load_config=True): +def __create_ospf6_global(tgen, input_dict, router, build=False, load_config=True): """ Helper API to create ospf global configuration. @@ -286,9 +281,9 @@ def __create_ospf6_global( del_ospf_action = ospf_data.setdefault("delete", False) if del_ospf_action: config_data = ["no ipv6 router ospf"] - result = create_common_configuration(tgen, router, config_data, - "ospf", build, - load_config) + result = create_common_configuration( + tgen, router, config_data, "ospf", build, load_config + ) return result config_data = [] @@ -301,11 +296,11 @@ def __create_ospf6_global( if del_router_id: config_data.append("no ospf router-id") if router_id: - config_data.append("ospf router-id {}".format( - router_id)) + config_data.append("ospf router-id {}".format(router_id)) - result = create_common_configuration(tgen, router, config_data, - "ospf", build, load_config) + result = create_common_configuration( + tgen, router, config_data, "ospf", build, load_config + ) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() @@ -315,8 +310,8 @@ def __create_ospf6_global( logger.debug("Exiting lib API: create_ospf_global()") return result -def config_ospf_interface (tgen, topo, input_dict=None, build=False, - load_config=True): + +def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure ospf on router. @@ -356,22 +351,25 @@ def config_ospf_interface (tgen, topo, input_dict=None, build=False, input_dict = deepcopy(input_dict) for router in input_dict.keys(): config_data = [] - for lnk in input_dict[router]['links'].keys(): - if "ospf" not in input_dict[router]['links'][lnk]: - logger.debug("Router %s: ospf configs is not present in" - "input_dict, passed input_dict", router, - input_dict) + for lnk in input_dict[router]["links"].keys(): + if "ospf" not in input_dict[router]["links"][lnk]: + logger.debug( + "Router %s: ospf configs is not present in" + "input_dict, passed input_dict", + router, + input_dict, + ) continue - ospf_data = input_dict[router]['links'][lnk]['ospf'] + ospf_data = input_dict[router]["links"][lnk]["ospf"] data_ospf_area = ospf_data.setdefault("area", None) data_ospf_auth = ospf_data.setdefault("authentication", None) data_ospf_dr_priority = ospf_data.setdefault("priority", None) data_ospf_cost = ospf_data.setdefault("cost", None) try: - intf = topo['routers'][router]['links'][lnk]['interface'] + intf = topo["routers"][router]["links"][lnk]["interface"] except KeyError: - intf = topo['switches'][router]['links'][lnk]['interface'] + intf = topo["switches"][router]["links"][lnk]["interface"] # interface cmd = "interface {}".format(intf) @@ -383,58 +381,60 @@ def config_ospf_interface (tgen, topo, input_dict=None, build=False, config_data.append(cmd) # interface ospf auth if data_ospf_auth: - if data_ospf_auth == 'null': + if data_ospf_auth == "null": cmd = "ip ospf authentication null" - elif data_ospf_auth == 'message-digest': + elif data_ospf_auth == "message-digest": cmd = "ip ospf authentication message-digest" else: cmd = "ip ospf authentication" - if 'del_action' in ospf_data: + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if "message-digest-key" in ospf_data: cmd = "ip ospf message-digest-key {} md5 {}".format( - ospf_data["message-digest-key"],ospf_data[ - "authentication-key"]) - if 'del_action' in ospf_data: + ospf_data["message-digest-key"], ospf_data["authentication-key"] + ) + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) - if "authentication-key" in ospf_data and \ - "message-digest-key" not in ospf_data: - cmd = "ip ospf authentication-key {}".format(ospf_data[ - "authentication-key"]) - if 'del_action' in ospf_data: + if ( + "authentication-key" in ospf_data + and "message-digest-key" not in ospf_data + ): + cmd = "ip ospf authentication-key {}".format( + ospf_data["authentication-key"] + ) + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) # interface ospf dr priority if data_ospf_dr_priority in ospf_data: - cmd = "ip ospf priority {}".format( - ospf_data["priority"]) - if 'del_action' in ospf_data: + cmd = "ip ospf priority {}".format(ospf_data["priority"]) + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) # interface ospf cost if data_ospf_cost in ospf_data: - cmd = "ip ospf cost {}".format( - ospf_data["cost"]) - if 'del_action' in ospf_data: + cmd = "ip ospf cost {}".format(ospf_data["cost"]) + if "del_action" in ospf_data: cmd = "no {}".format(cmd) config_data.append(cmd) if build: return config_data else: - result = create_common_configuration(tgen, router, config_data, - "interface_config", - build=build) + result = create_common_configuration( + tgen, router, config_data, "interface_config", build=build + ) logger.debug("Exiting lib API: create_igmp_config()") return result + def clear_ospf(tgen, router): """ This API is to clear ospf neighborship by running @@ -517,15 +517,16 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): result = False if input_dict: for router, rnode in tgen.routers().items(): - if 'ospf' not in topo['routers'][router]: + if "ospf" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF neighborship on router %s:", router) - show_ospf_json = run_frr_cmd(rnode, - "show ip ospf neighbor all json", isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf neighbor all json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -533,126 +534,134 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False): return errormsg ospf_data_list = input_dict[router]["ospf"] - ospf_nbr_list = ospf_data_list['neighbors'] + ospf_nbr_list = ospf_data_list["neighbors"] for ospf_nbr, nbr_data in ospf_nbr_list.items(): - data_ip = topo['routers'][ospf_nbr]['links'] - data_rid = topo['routers'][ospf_nbr]['ospf']['router_id'] + data_ip = topo["routers"][ospf_nbr]["links"] + data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"] if ospf_nbr in data_ip: nbr_details = nbr_data[ospf_nbr] elif lan: - for switch in topo['switches']: - if 'ospf' in topo['switches'][switch]['links'][router]: - neighbor_ip = data_ip[switch]['ipv4'].split("/")[0] + for switch in topo["switches"]: + if "ospf" in topo["switches"][switch]["links"][router]: + neighbor_ip = data_ip[switch]["ipv4"].split("/")[0] else: continue else: - neighbor_ip = data_ip[router]['ipv4'].split("/")[0] + neighbor_ip = data_ip[router]["ipv4"].split("/")[0] nh_state = None neighbor_ip = neighbor_ip.lower() nbr_rid = data_rid try: - nh_state = show_ospf_json[nbr_rid][0][ - 'state'].split('/')[0] - intf_state = show_ospf_json[nbr_rid][0][ - 'state'].split('/')[1] + nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0] + intf_state = show_ospf_json[nbr_rid][0]["state"].split("/")[1] except KeyError: - errormsg = "[DUT: {}] OSPF peer {} missing".format(router, - nbr_rid) + errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid) return errormsg - nbr_state = nbr_data.setdefault("state",None) - nbr_role = nbr_data.setdefault("role",None) + nbr_state = nbr_data.setdefault("state", None) + nbr_role = nbr_data.setdefault("role", None) if nbr_state: if nbr_state == nh_state: - logger.info("[DUT: {}] OSPF Nbr is {}:{} State {}".format - (router, ospf_nbr, nbr_rid, nh_state)) + logger.info( + "[DUT: {}] OSPF Nbr is {}:{} State {}".format( + router, ospf_nbr, nbr_rid, nh_state + ) + ) result = True else: - errormsg = ("[DUT: {}] OSPF is not Converged, neighbor" - " state is {}".format(router, nh_state)) + errormsg = ( + "[DUT: {}] OSPF is not Converged, neighbor" + " state is {}".format(router, nh_state) + ) return errormsg if nbr_role: if nbr_role == intf_state: - logger.info("[DUT: {}] OSPF Nbr is {}: {} Role {}".format( - router, ospf_nbr, nbr_rid, nbr_role)) + logger.info( + "[DUT: {}] OSPF Nbr is {}: {} Role {}".format( + router, ospf_nbr, nbr_rid, nbr_role + ) + ) else: - errormsg = ("[DUT: {}] OSPF is not Converged with rid" - "{}, role is {}".format(router, nbr_rid, intf_state)) + errormsg = ( + "[DUT: {}] OSPF is not Converged with rid" + "{}, role is {}".format(router, nbr_rid, intf_state) + ) return errormsg continue else: for router, rnode in tgen.routers().items(): - if 'ospf' not in topo['routers'][router]: + if "ospf" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF neighborship on router %s:", router) - show_ospf_json = run_frr_cmd(rnode, - "show ip ospf neighbor all json", isjson=True) + show_ospf_json = run_frr_cmd( + rnode, "show ip ospf neighbor all json", isjson=True + ) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" return errormsg ospf_data_list = topo["routers"][router]["ospf"] - ospf_neighbors = ospf_data_list['neighbors'] + ospf_neighbors = ospf_data_list["neighbors"] total_peer = 0 total_peer = len(ospf_neighbors.keys()) no_of_ospf_nbr = 0 - ospf_nbr_list = ospf_data_list['neighbors'] + ospf_nbr_list = ospf_data_list["neighbors"] no_of_peer = 0 for ospf_nbr, nbr_data in ospf_nbr_list.items(): if nbr_data: - data_ip = topo['routers'][nbr_data["nbr"]]['links'] - data_rid = topo['routers'][nbr_data["nbr"]][ - 'ospf']['router_id'] + data_ip = topo["routers"][nbr_data["nbr"]]["links"] + data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"] else: - data_ip = topo['routers'][ospf_nbr]['links'] - data_rid = topo['routers'][ospf_nbr]['ospf']['router_id'] + data_ip = topo["routers"][ospf_nbr]["links"] + data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"] if ospf_nbr in data_ip: nbr_details = nbr_data[ospf_nbr] elif lan: - for switch in topo['switches']: - if 'ospf' in topo['switches'][switch]['links'][router]: - neighbor_ip = data_ip[switch]['ipv4'].split("/")[0] + for switch in topo["switches"]: + if "ospf" in topo["switches"][switch]["links"][router]: + neighbor_ip = data_ip[switch]["ipv4"].split("/")[0] else: continue else: - neighbor_ip = data_ip[router]['ipv4'].split("/")[0] + neighbor_ip = data_ip[router]["ipv4"].split("/")[0] nh_state = None neighbor_ip = neighbor_ip.lower() nbr_rid = data_rid try: - nh_state = show_ospf_json[nbr_rid][0][ - 'state'].split('/')[0] + nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0] except KeyError: - errormsg = "[DUT: {}] OSPF peer {} missing,from "\ - "{} ".format(router, - nbr_rid, ospf_nbr) + errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format( + router, nbr_rid, ospf_nbr + ) return errormsg - if nh_state == 'Full': + if nh_state == "Full": no_of_peer += 1 if no_of_peer == total_peer: logger.info("[DUT: {}] OSPF is Converged".format(router)) result = True else: - errormsg = ("[DUT: {}] OSPF is not Converged".format(router)) + errormsg = "[DUT: {}] OSPF is not Converged".format(router) return errormsg logger.debug("Exiting API: verify_ospf_neighbor()") return result + @retry(attempts=21, wait=2, return_is_str=True) -def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, - tag=None, metric=None, fib=None): +def verify_ospf_rib( + tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None +): """ This API is to verify ospf routes by running show ip ospf route command. @@ -706,25 +715,28 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, found_routes = [] missing_routes = [] - if "static_routes" in input_dict[routerInput] or \ - "prefix" in input_dict[routerInput]: + if ( + "static_routes" in input_dict[routerInput] + or "prefix" in input_dict[routerInput] + ): if "prefix" in input_dict[routerInput]: static_routes = input_dict[routerInput]["prefix"] else: static_routes = input_dict[routerInput]["static_routes"] - for static_route in static_routes: cmd = "{}".format(command) cmd = "{} json".format(cmd) - ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True) + ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True) # Verifying output dictionary ospf_rib_json is not empty if bool(ospf_rib_json) is False: - errormsg = "[DUT: {}] No routes found in OSPF route " \ + errormsg = ( + "[DUT: {}] No routes found in OSPF route " "table".format(router) + ) return errormsg network = static_route["network"] @@ -732,7 +744,6 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, _tag = static_route.setdefault("tag", None) _rtype = static_route.setdefault("routeType", None) - # Generating IPs for verification ip_list = generate_ips(network, no_of_ip) st_found = False @@ -742,7 +753,7 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt))) _addr_type = validate_ip_address(st_rt) - if _addr_type != 'ipv4': + if _addr_type != "ipv4": continue if st_rt in ospf_rib_json: @@ -754,17 +765,26 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, next_hop = [next_hop] for mnh in range(0, len(ospf_rib_json[st_rt])): - if 'fib' in ospf_rib_json[st_rt][ - mnh]["nexthops"][0]: - found_hops.append([rib_r[ - "ip"] for rib_r in ospf_rib_json[ - st_rt][mnh]["nexthops"]]) + if ( + "fib" + in ospf_rib_json[st_rt][mnh]["nexthops"][0] + ): + found_hops.append( + [ + rib_r["ip"] + for rib_r in ospf_rib_json[st_rt][mnh][ + "nexthops" + ] + ] + ) if found_hops[0]: - missing_list_of_nexthops = \ - set(found_hops[0]).difference(next_hop) - additional_nexthops_in_required_nhs = \ - set(next_hop).difference(found_hops[0]) + missing_list_of_nexthops = set( + found_hops[0] + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops[0]) if additional_nexthops_in_required_nhs: logger.info( @@ -772,13 +792,18 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, "%s is not active for route %s in " "RIB of router %s\n", additional_nexthops_in_required_nhs, - st_rt, dut) + st_rt, + dut, + ) errormsg = ( "Nexthop {} is not active" " for route {} in RIB of router" " {}\n".format( - additional_nexthops_in_required_nhs, - st_rt, dut)) + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) return errormsg else: nh_found = True @@ -786,99 +811,111 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, elif next_hop and fib is None: if type(next_hop) is not list: next_hop = [next_hop] - found_hops = [rib_r["ip"] for rib_r in - ospf_rib_json[st_rt][ - "nexthops"]] + found_hops = [ + rib_r["ip"] + for rib_r in ospf_rib_json[st_rt]["nexthops"] + ] if found_hops: - missing_list_of_nexthops = \ - set(found_hops).difference(next_hop) - additional_nexthops_in_required_nhs = \ - set(next_hop).difference(found_hops) + missing_list_of_nexthops = set( + found_hops + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops) if additional_nexthops_in_required_nhs: logger.info( - "Missing nexthop %s for route"\ - " %s in RIB of router %s\n", \ - additional_nexthops_in_required_nhs, \ - st_rt, dut) - errormsg=("Nexthop {} is Missing for "\ - "route {} in RIB of router {}\n".format( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", additional_nexthops_in_required_nhs, - st_rt, dut)) + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) return errormsg else: nh_found = True if _rtype: - if "routeType" not in ospf_rib_json[ - st_rt]: - errormsg = ("[DUT: {}]: routeType missing" - "for route {} in OSPF RIB \n".\ - format(dut, st_rt)) + if "routeType" not in ospf_rib_json[st_rt]: + errormsg = ( + "[DUT: {}]: routeType missing" + "for route {} in OSPF RIB \n".format(dut, st_rt) + ) return errormsg - elif _rtype != ospf_rib_json[st_rt][ - "routeType"]: - errormsg = ("[DUT: {}]: routeType mismatch" - "for route {} in OSPF RIB \n".\ - format(dut, st_rt)) + elif _rtype != ospf_rib_json[st_rt]["routeType"]: + errormsg = ( + "[DUT: {}]: routeType mismatch" + "for route {} in OSPF RIB \n".format(dut, st_rt) + ) return errormsg else: - logger.info("DUT: {}]: Found routeType {}" - "for route {}".\ - format(dut, _rtype, st_rt)) + logger.info( + "DUT: {}]: Found routeType {}" + "for route {}".format(dut, _rtype, st_rt) + ) if tag: - if "tag" not in ospf_rib_json[ - st_rt]: - errormsg = ("[DUT: {}]: tag is not" - " present for" - " route {} in RIB \n".\ - format(dut, st_rt - )) + if "tag" not in ospf_rib_json[st_rt]: + errormsg = ( + "[DUT: {}]: tag is not" + " present for" + " route {} in RIB \n".format(dut, st_rt) + ) return errormsg - if _tag != ospf_rib_json[ - st_rt]["tag"]: - errormsg = ("[DUT: {}]: tag value {}" - " is not matched for" - " route {} in RIB \n".\ - format(dut, _tag, st_rt, - )) + if _tag != ospf_rib_json[st_rt]["tag"]: + errormsg = ( + "[DUT: {}]: tag value {}" + " is not matched for" + " route {} in RIB \n".format(dut, _tag, st_rt,) + ) return errormsg if metric is not None: - if "type2cost" not in ospf_rib_json[ - st_rt]: - errormsg = ("[DUT: {}]: metric is" - " not present for" - " route {} in RIB \n".\ - format(dut, st_rt)) + if "type2cost" not in ospf_rib_json[st_rt]: + errormsg = ( + "[DUT: {}]: metric is" + " not present for" + " route {} in RIB \n".format(dut, st_rt) + ) return errormsg - if metric != ospf_rib_json[ - st_rt]["type2cost"]: - errormsg = ("[DUT: {}]: metric value " - "{} is not matched for " - "route {} in RIB \n".\ - format(dut, metric, st_rt, - )) + if metric != ospf_rib_json[st_rt]["type2cost"]: + errormsg = ( + "[DUT: {}]: metric value " + "{} is not matched for " + "route {} in RIB \n".format(dut, metric, st_rt,) + ) return errormsg else: missing_routes.append(st_rt) if nh_found: - logger.info("[DUT: {}]: Found next_hop {} for all OSPF" - " routes in RIB".format(router, next_hop)) + logger.info( + "[DUT: {}]: Found next_hop {} for all OSPF" + " routes in RIB".format(router, next_hop) + ) if len(missing_routes) > 0: - errormsg = ("[DUT: {}]: Missing route in RIB, " - "routes: {}".\ - format(dut, missing_routes)) + errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format( + dut, missing_routes + ) return errormsg if found_routes: - logger.info("[DUT: %s]: Verified routes in RIB, found" - " routes are: %s\n", dut, found_routes) + logger.info( + "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n", + dut, + found_routes, + ) result = True logger.info("Exiting lib API: verify_ospf_rib()") @@ -886,7 +923,7 @@ def verify_ospf_rib(tgen, dut, input_dict, next_hop=None, @retry(attempts=10, wait=2, return_is_str=True) -def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None): +def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None): """ This API is to verify ospf routes by running show ip ospf interface command. @@ -928,15 +965,14 @@ def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None): logger.debug("Entering lib API: verify_ospf_interface()") result = False for router, rnode in tgen.routers().items(): - if 'ospf' not in topo['routers'][router]: + if "ospf" not in topo["routers"][router]: continue if dut is not None and dut != router: continue logger.info("Verifying OSPF interface on router %s:", router) - show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", - isjson=True) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -946,19 +982,29 @@ def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None): # To find neighbor ip type ospf_intf_data = input_dict[router]["links"] for ospf_intf, intf_data in ospf_intf_data.items(): - intf = topo['routers'][router]['links'][ospf_intf]['interface'] - if intf in show_ospf_json['interfaces']: - for intf_attribute in intf_data['ospf']: - if intf_data['ospf'][intf_attribute] == show_ospf_json[ - 'interfaces'][intf][intf_attribute]: - logger.info("[DUT: %s] OSPF interface %s: %s is %s", - router, intf, intf_attribute, intf_data['ospf'][ - intf_attribute]) + intf = topo["routers"][router]["links"][ospf_intf]["interface"] + if intf in show_ospf_json["interfaces"]: + for intf_attribute in intf_data["ospf"]: + if ( + intf_data["ospf"][intf_attribute] + == show_ospf_json["interfaces"][intf][intf_attribute] + ): + logger.info( + "[DUT: %s] OSPF interface %s: %s is %s", + router, + intf, + intf_attribute, + intf_data["ospf"][intf_attribute], + ) else: - errormsg= "[DUT: {}] OSPF interface {}: {} is {}, \ - Expected is {}".format(router, intf, intf_attribute, - intf_data['ospf'][intf_attribute], show_ospf_json[ - 'interfaces'][intf][intf_attribute]) + errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \ + Expected is {}".format( + router, + intf, + intf_attribute, + intf_data["ospf"][intf_attribute], + show_ospf_json["interfaces"][intf][intf_attribute], + ) return errormsg result = True logger.debug("Exiting API: verify_ospf_interface()") @@ -1016,16 +1062,14 @@ def verify_ospf_database(tgen, topo, dut, input_dict): router = dut logger.debug("Entering lib API: verify_ospf_database()") - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - dut) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut) return errormsg rnode = tgen.routers()[dut] logger.info("Verifying OSPF interface on router %s:", dut) - show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", - isjson=True) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" @@ -1033,82 +1077,103 @@ def verify_ospf_database(tgen, topo, dut, input_dict): # for inter and inter lsa's ospf_db_data = input_dict.setdefault("areas", None) - ospf_external_lsa = input_dict.setdefault( - 'AS External Link States', None) + ospf_external_lsa = input_dict.setdefault("AS External Link States", None) if ospf_db_data: - for ospf_area, area_lsa in ospf_db_data.items(): - if ospf_area in show_ospf_json['areas']: - if 'Router Link States' in area_lsa: - for lsa in area_lsa['Router Link States']: - if lsa in show_ospf_json['areas'][ospf_area][ - 'Router Link States']: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Router " - "LSA %s", router, ospf_area, lsa) - result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + for ospf_area, area_lsa in ospf_db_data.items(): + if ospf_area in show_ospf_json["areas"]: + if "Router Link States" in area_lsa: + for lsa in area_lsa["Router Link States"]: + if ( + lsa + in show_ospf_json["areas"][ospf_area]["Router Link States"] + ): + logger.info( + "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Router LSA is {}".format(router, ospf_area, lsa) - return errormsg - if 'Net Link States' in area_lsa: - for lsa in area_lsa['Net Link States']: - if lsa in show_ospf_json['areas'][ospf_area][ - 'Net Link States']: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Network " - "LSA %s", router, ospf_area, lsa) - result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + ) + return errormsg + if "Net Link States" in area_lsa: + for lsa in area_lsa["Net Link States"]: + if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Network LSA is {}".format(router, ospf_area, lsa) - return errormsg - if 'Summary Link States' in area_lsa: - for lsa in area_lsa['Summary Link States']: - if lsa in show_ospf_json['areas'][ospf_area][ - 'Summary Link States']: - logger.info( - "[DUT: %s] OSPF LSDB area %s:Summary " - "LSA %s", router, ospf_area, lsa) - result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ + ) + return errormsg + if "Summary Link States" in area_lsa: + for lsa in area_lsa["Summary Link States"]: + if ( + lsa + in show_ospf_json["areas"][ospf_area]["Summary Link States"] + ): + logger.info( + "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" " Summary LSA is {}".format(router, ospf_area, lsa) - return errormsg - if 'ASBR-Summary Link States' in area_lsa: - for lsa in area_lsa['ASBR-Summary Link States']: - if lsa in show_ospf_json['areas'][ospf_area][ - 'ASBR-Summary Link States']: - logger.info( - "[DUT: %s] OSPF LSDB area %s:ASBR Summary " - "LSA %s", router, ospf_area, lsa) - result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB area {}: expected" \ - " ASBR Summary LSA is {}".format( - router, ospf_area, lsa) - return errormsg + ) + return errormsg + if "ASBR-Summary Link States" in area_lsa: + for lsa in area_lsa["ASBR-Summary Link States"]: + if ( + lsa + in show_ospf_json["areas"][ospf_area][ + "ASBR-Summary Link States" + ] + ): + logger.info( + "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " ASBR Summary LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg if ospf_external_lsa: - for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items(): - if ospf_ext_lsa in show_ospf_json['AS External Link States']: - logger.info( - "[DUT: %s] OSPF LSDB:External LSA %s", - router, ospf_ext_lsa) - result = True - else: - errormsg = \ - "[DUT: {}] OSPF LSDB : expected" \ - " External LSA is {}".format(router, ospf_ext_lsa) - return errormsg + for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items(): + if ospf_ext_lsa in show_ospf_json["AS External Link States"]: + logger.info( + "[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB : expected" + " External LSA is {}".format(router, ospf_ext_lsa) + ) + return errormsg logger.debug("Exiting API: verify_ospf_database()") return result - @retry(attempts=10, wait=2, return_is_str=True) def verify_ospf_summary(tgen, topo, dut, input_dict): """ @@ -1146,14 +1211,12 @@ def verify_ospf_summary(tgen, topo, dut, input_dict): logger.info("Verifying OSPF summary on router %s:", router) - if 'ospf' not in topo['routers'][dut]: - errormsg = "[DUT: {}] OSPF is not configured on the router.".format( - router) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router) return errormsg rnode = tgen.routers()[dut] - show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", - isjson=True) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): @@ -1165,17 +1228,25 @@ def verify_ospf_summary(tgen, topo, dut, input_dict): for ospf_summ, summ_data in ospf_summary_data.items(): if ospf_summ not in show_ospf_json: continue - summary = ospf_summary_data[ospf_summ]['Summary address'] + summary = ospf_summary_data[ospf_summ]["Summary address"] if summary in show_ospf_json: for summ in summ_data: if summ_data[summ] == show_ospf_json[summary][summ]: - logger.info("[DUT: %s] OSPF summary %s:%s is %s", - router, summary, summ, summ_data[summ]) + logger.info( + "[DUT: %s] OSPF summary %s:%s is %s", + router, + summary, + summ, + summ_data[summ], + ) result = True else: - errormsg = ("[DUT: {}] OSPF summary {}:{} is %s, " - "Expected is {}".format(router,summary, summ, - show_ospf_json[summary][summ])) + errormsg = ( + "[DUT: {}] OSPF summary {}:{} is %s, " + "Expected is {}".format( + router, summary, summ, show_ospf_json[summary][summ] + ) + ) return errormsg logger.debug("Exiting API: verify_ospf_summary()") |
