diff options
Diffstat (limited to 'tests/topotests/lib/ospf.py')
| -rw-r--r-- | tests/topotests/lib/ospf.py | 598 |
1 files changed, 564 insertions, 34 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 59ae1a2559..7609c7f899 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -1,26 +1,14 @@ +# SPDX-License-Identifier: ISC # # Copyright (c) 2020 by VMware, Inc. ("VMware") # Used Copyright (c) 2018 by Network Device Education Foundation, Inc. # ("NetDEF") in this file. # -# Permission to use, copy, modify, and/or distribute this software -# for any purpose with or without fee is hereby granted, provided -# that the above copyright notice and this permission notice appear -# in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR -# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY -# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, -# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS -# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE -# OF THIS SOFTWARE. -# import ipaddress import sys from copy import deepcopy +from time import sleep # Import common_config to use commomnly used APIs from lib.common_config import ( @@ -201,6 +189,24 @@ def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf): cmd = "no maximum-paths" config_data.append(cmd) + # Flood reduction. + flood_data = ospf_data.setdefault("flood-reduction", {}) + if flood_data: + cmd = "flood-reduction" + del_action = ospf_data.setdefault("del_flood_reduction", False) + if del_action: + cmd = "no flood-reduction" + config_data.append(cmd) + + # LSA refresh timer - A hidden command. + refresh_data = ospf_data.setdefault("lsa-refresh", {}) + if refresh_data: + cmd = "ospf lsa-refresh {}".format(refresh_data) + del_action = ospf_data.setdefault("del_lsa_refresh", False) + if del_action: + cmd = "no ospf lsa-refresh" + config_data.append(cmd) + # redistribute command redistribute_data = ospf_data.setdefault("redistribute", {}) if redistribute_data: @@ -233,6 +239,9 @@ def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf): if "type" in area: cmd = cmd + " {}".format(area["type"]) + if "flood-reduction" in area: + cmd = cmd + " flood-reduction" + del_action = area.setdefault("delete", False) if del_action: cmd = "no {}".format(cmd) @@ -737,9 +746,6 @@ def verify_ospf_neighbor( return result -################################ -# Verification procs -################################ @retry(retry_timeout=50) def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False): """ @@ -1352,8 +1358,10 @@ def verify_ospf_interface( return result -@retry(retry_timeout=20) -def verify_ospf_database(tgen, topo, dut, input_dict, expected=True): +@retry(retry_timeout=40) +def verify_ospf_database( + tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None, expected=True +): """ This API is to verify ospf lsa's by running show ip ospf database command. @@ -1411,7 +1419,23 @@ def verify_ospf_database(tgen, topo, dut, input_dict, expected=True): rnode = tgen.routers()[dut] logger.info("Verifying OSPF interface on router %s:", dut) - show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True) + + if not rid: + rid = "self-originate" + if lsatype: + if vrf is None: + command = "show ip ospf database {} {} json".format(lsatype, rid) + else: + command = "show ip ospf database {} {} vrf {} json".format( + lsatype, rid, vrf + ) + else: + if vrf is None: + command = "show ip ospf database json" + else: + command = "show ip ospf database vrf {} json".format(vrf) + + show_ospf_json = run_frr_cmd(rnode, command, isjson=True) # Verifying output dictionary show_ospf_json is empty or not if not bool(show_ospf_json): errormsg = "OSPF is not running" @@ -1420,26 +1444,40 @@ def verify_ospf_database(tgen, topo, dut, input_dict, expected=True): # for inter and inter lsa's ospf_db_data = input_dict.setdefault("areas", None) ospf_external_lsa = input_dict.setdefault("AS External Link States", None) + # import pdb; pdb.set_trace() if ospf_db_data: for ospf_area, area_lsa in ospf_db_data.items(): - if ospf_area in show_ospf_json["areas"]: - if "Router Link States" in area_lsa: - for lsa in area_lsa["Router Link States"]: + if ospf_area in show_ospf_json["routerLinkStates"]["areas"]: + if "routerLinkStates" in area_lsa: + for lsa in area_lsa["routerLinkStates"]: + _advrtr = lsa.setdefault("advertisedRouter", None) + _options = lsa.setdefault("options", None) + if ( - lsa - in show_ospf_json["areas"][ospf_area]["Router Link States"] + _options + and lsa["lsaId"] + == show_ospf_json["routerLinkStates"]["areas"][ospf_area][ + 0 + ]["linkStateId"] + and lsa["options"] + == show_ospf_json["routerLinkStates"]["areas"][ospf_area][ + 0 + ]["options"] ): - logger.info( - "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s", - router, - ospf_area, - lsa, - ) result = True + break else: - errormsg = ( - "[DUT: {}] OSPF LSDB area {}: expected" - " Router LSA is {}".format(router, ospf_area, lsa) + errormsg = '[DUT: {}] OSPF LSA options: expected {}, Received Options are {} lsa["options"] {} OSPF LSAID: expected lsaid {}, Received lsaid {}'.format( + dut, + show_ospf_json["routerLinkStates"]["areas"][ospf_area][ + 0 + ]["options"], + _options, + lsa["options"], + show_ospf_json["routerLinkStates"]["areas"][ospf_area][ + 0 + ]["linkStateId"], + lsa["lsaId"], ) return errormsg if "Net Link States" in area_lsa: @@ -2489,3 +2527,495 @@ def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result + + +def get_ospf_database(tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None): + """ + This API is to return ospf lsa's by running + show ip ospf database command. + + Parameters + ---------- + * `tgen` : Topogen object + * `dut`: device under test + * `input_dict` : Input dict data, required when configuring from testcase + * `topo` : next to be verified + * `vrf` : vrf to be checked + * `lsatype` : type of lsa to be checked + * `rid` : router id for lsa to be checked + Usage + ----- + input_dict = { + "areas": { + "0.0.0.0": { + "routerLinkStates": { + "100.1.1.0-100.1.1.0": { + "LSID": "100.1.1.0", + "Advertised router": "100.1.1.0", + "LSA Age": 130, + "Sequence Number": "80000006", + "Checksum": "a703", + "Router links": 3 + } + }, + "networkLinkStates": { + "10.0.0.2-100.1.1.1": { + "LSID": "10.0.0.2", + "Advertised router": "100.1.1.1", + "LSA Age": 137, + "Sequence Number": "80000001", + "Checksum": "9583" + } + }, + }, + } + } + result = get_ospf_database(tgen, topo, dut, input_dict) + + Returns + ------- + True or False (Error Message) + """ + + result = False + router = dut + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + sleep(10) + if "ospf" not in topo["routers"][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut) + return errormsg + + rnode = tgen.routers()[dut] + + logger.info("Verifying OSPF interface on router %s:", dut) + if not rid: + rid = "self-originate" + if lsatype: + if vrf is None: + command = "show ip ospf database {} {} json".format(lsatype, rid) + else: + command = "show ip ospf database {} {} vrf {} json".format( + lsatype, rid, vrf + ) + else: + if vrf is None: + command = "show ip ospf database json" + else: + command = "show ip ospf database vrf {} json".format(vrf) + + show_ospf_json = run_frr_cmd(rnode, command, isjson=True) + # Verifying output dictionary show_ospf_json is empty or not + if not bool(show_ospf_json): + errormsg = "OSPF is not running" + return errormsg + + # for inter and inter lsa's + ospf_db_data = input_dict.setdefault("areas", None) + ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None) + + if ospf_db_data: + for ospf_area, area_lsa in ospf_db_data.items(): + if "areas" in show_ospf_json and ospf_area in show_ospf_json["areas"]: + if "routerLinkStates" in area_lsa: + for lsa in area_lsa["routerLinkStates"]: + for rtrlsa in show_ospf_json["areas"][ospf_area][ + "routerLinkStates" + ]: + _advrtr = lsa.setdefault("advertisedRouter", None) + _options = lsa.setdefault("options", None) + if ( + _advrtr + and lsa["lsaId"] == rtrlsa["lsaId"] + and lsa["advertisedRouter"] + == rtrlsa["advertisedRouter"] + ): + result = True + break + if ( + _options + and lsa["lsaId"] == rtrlsa["lsaId"] + and lsa["options"] == rtrlsa["options"] + ): + result = True + break + + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Router LSA is {}\n found Router LSA: {}".format( + router, ospf_area, lsa, rtrlsa + ) + ) + return errormsg + + if "networkLinkStates" in area_lsa: + for lsa in area_lsa["networkLinkStates"]: + for netlsa in show_ospf_json["areas"][ospf_area][ + "networkLinkStates" + ]: + if ( + lsa + in show_ospf_json["areas"][ospf_area][ + "networkLinkStates" + ] + ): + if ( + lsa["lsaId"] == netlsa["lsaId"] + and lsa["advertisedRouter"] + == netlsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Network LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "summaryLinkStates" in area_lsa: + for lsa in area_lsa["summaryLinkStates"]: + for t3lsa in show_ospf_json["areas"][ospf_area][ + "summaryLinkStates" + ]: + if ( + lsa["lsaId"] == t3lsa["lsaId"] + and lsa["advertisedRouter"] == t3lsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Summary LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "nssaExternalLinkStates" in area_lsa: + for lsa in area_lsa["nssaExternalLinkStates"]: + for t7lsa in show_ospf_json["areas"][ospf_area][ + "nssaExternalLinkStates" + ]: + if ( + lsa["lsaId"] == t7lsa["lsaId"] + and lsa["advertisedRouter"] == t7lsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Type7 LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "asbrSummaryLinkStates" in area_lsa: + for lsa in area_lsa["asbrSummaryLinkStates"]: + for t4lsa in show_ospf_json["areas"][ospf_area][ + "asbrSummaryLinkStates" + ]: + if ( + lsa["lsaId"] == t4lsa["lsaId"] + and lsa["advertisedRouter"] == t4lsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " ASBR Summary LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "linkLocalOpaqueLsa" in area_lsa: + for lsa in area_lsa["linkLocalOpaqueLsa"]: + try: + for lnklsa in show_ospf_json["areas"][ospf_area][ + "linkLocalOpaqueLsa" + ]: + if ( + lsa["lsaId"] in lnklsa["lsaId"] + and "linkLocalOpaqueLsa" + in show_ospf_json["areas"][ospf_area] + ): + logger.info( + ( + "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA" + "%s", + ospf_area, + lsa, + ) + ) + result = True + else: + errormsg = ( + "[DUT: FRR] OSPF LSDB area: {} " + "expected Opaque-LSA is {}, Found is {}".format( + ospf_area, lsa, show_ospf_json + ) + ) + raise ValueError(errormsg) + return errormsg + except KeyError: + errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present" + return errormsg + else: + if "routerLinkStates" in area_lsa: + for lsa in area_lsa["routerLinkStates"]: + for rtrlsa in show_ospf_json["routerLinkStates"]: + _advrtr = lsa.setdefault("advertisedRouter", None) + _options = lsa.setdefault("options", None) + _age = lsa.setdefault("lsaAge", None) + if ( + _options + and lsa["options"] + == show_ospf_json["routerLinkStates"][rtrlsa][ + ospf_area + ][0]["options"] + ): + result = True + break + if ( + _age is not "get" + and lsa["lsaAge"] + == show_ospf_json["routerLinkStates"][rtrlsa][ + ospf_area + ][0]["lsaAge"] + ): + result = True + break + + if _age == "get": + return "{}".format( + show_ospf_json["routerLinkStates"][rtrlsa][ + ospf_area + ][0]["lsaAge"] + ) + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Router LSA is {}\n found Router LSA: {}".format( + router, + ospf_area, + lsa, + show_ospf_json["routerLinkStates"], + ) + ) + return errormsg + + if "networkLinkStates" in area_lsa: + for lsa in area_lsa["networkLinkStates"]: + for netlsa in show_ospf_json["areas"][ospf_area][ + "networkLinkStates" + ]: + if ( + lsa + in show_ospf_json["areas"][ospf_area][ + "networkLinkStates" + ] + ): + if ( + lsa["lsaId"] == netlsa["lsaId"] + and lsa["advertisedRouter"] + == netlsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Network LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "summaryLinkStates" in area_lsa: + for lsa in area_lsa["summaryLinkStates"]: + for t3lsa in show_ospf_json["areas"][ospf_area][ + "summaryLinkStates" + ]: + if ( + lsa["lsaId"] == t3lsa["lsaId"] + and lsa["advertisedRouter"] == t3lsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Summary LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "nssaExternalLinkStates" in area_lsa: + for lsa in area_lsa["nssaExternalLinkStates"]: + for t7lsa in show_ospf_json["areas"][ospf_area][ + "nssaExternalLinkStates" + ]: + if ( + lsa["lsaId"] == t7lsa["lsaId"] + and lsa["advertisedRouter"] == t7lsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s", + router, + ospf_area, + lsa, + ) + break + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " Type7 LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "asbrSummaryLinkStates" in area_lsa: + for lsa in area_lsa["asbrSummaryLinkStates"]: + for t4lsa in show_ospf_json["areas"][ospf_area][ + "asbrSummaryLinkStates" + ]: + if ( + lsa["lsaId"] == t4lsa["lsaId"] + and lsa["advertisedRouter"] == t4lsa["advertisedRouter"] + ): + result = True + break + if result: + logger.info( + "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s", + router, + ospf_area, + lsa, + ) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB area {}: expected" + " ASBR Summary LSA is {}".format(router, ospf_area, lsa) + ) + return errormsg + + if "linkLocalOpaqueLsa" in area_lsa: + for lsa in area_lsa["linkLocalOpaqueLsa"]: + try: + for lnklsa in show_ospf_json["areas"][ospf_area][ + "linkLocalOpaqueLsa" + ]: + if ( + lsa["lsaId"] in lnklsa["lsaId"] + and "linkLocalOpaqueLsa" + in show_ospf_json["areas"][ospf_area] + ): + logger.info( + ( + "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA" + "%s", + ospf_area, + lsa, + ) + ) + result = True + else: + errormsg = ( + "[DUT: FRR] OSPF LSDB area: {} " + "expected Opaque-LSA is {}, Found is {}".format( + ospf_area, lsa, show_ospf_json + ) + ) + raise ValueError(errormsg) + return errormsg + except KeyError: + errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present" + return errormsg + + if ospf_external_lsa: + for lsa in ospf_external_lsa: + try: + for t5lsa in show_ospf_json["asExternalLinkStates"]: + if ( + lsa["lsaId"] == t5lsa["lsaId"] + and lsa["advertisedRouter"] == t5lsa["advertisedRouter"] + ): + result = True + break + except KeyError: + result = False + if result: + logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa) + result = True + else: + errormsg = ( + "[DUT: {}] OSPF LSDB : expected" + " External LSA is {}".format(router, lsa) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result |
