summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py598
1 files changed, 564 insertions, 34 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 59ae1a2559..7609c7f899 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -1,26 +1,14 @@
+# SPDX-License-Identifier: ISC
#
# Copyright (c) 2020 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
-# Permission to use, copy, modify, and/or distribute this software
-# for any purpose with or without fee is hereby granted, provided
-# that the above copyright notice and this permission notice appear
-# in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
-# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
-# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
-# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
-# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
-# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
-# OF THIS SOFTWARE.
-#
import ipaddress
import sys
from copy import deepcopy
+from time import sleep
# Import common_config to use commomnly used APIs
from lib.common_config import (
@@ -201,6 +189,24 @@ def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
cmd = "no maximum-paths"
config_data.append(cmd)
+ # Flood reduction.
+ flood_data = ospf_data.setdefault("flood-reduction", {})
+ if flood_data:
+ cmd = "flood-reduction"
+ del_action = ospf_data.setdefault("del_flood_reduction", False)
+ if del_action:
+ cmd = "no flood-reduction"
+ config_data.append(cmd)
+
+ # LSA refresh timer - A hidden command.
+ refresh_data = ospf_data.setdefault("lsa-refresh", {})
+ if refresh_data:
+ cmd = "ospf lsa-refresh {}".format(refresh_data)
+ del_action = ospf_data.setdefault("del_lsa_refresh", False)
+ if del_action:
+ cmd = "no ospf lsa-refresh"
+ config_data.append(cmd)
+
# redistribute command
redistribute_data = ospf_data.setdefault("redistribute", {})
if redistribute_data:
@@ -233,6 +239,9 @@ def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
if "type" in area:
cmd = cmd + " {}".format(area["type"])
+ if "flood-reduction" in area:
+ cmd = cmd + " flood-reduction"
+
del_action = area.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
@@ -737,9 +746,6 @@ def verify_ospf_neighbor(
return result
-################################
-# Verification procs
-################################
@retry(retry_timeout=50)
def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
"""
@@ -1352,8 +1358,10 @@ def verify_ospf_interface(
return result
-@retry(retry_timeout=20)
-def verify_ospf_database(tgen, topo, dut, input_dict, expected=True):
+@retry(retry_timeout=40)
+def verify_ospf_database(
+ tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None, expected=True
+):
"""
This API is to verify ospf lsa's by running
show ip ospf database command.
@@ -1411,7 +1419,23 @@ def verify_ospf_database(tgen, topo, dut, input_dict, expected=True):
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
- show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
+
+ if not rid:
+ rid = "self-originate"
+ if lsatype:
+ if vrf is None:
+ command = "show ip ospf database {} {} json".format(lsatype, rid)
+ else:
+ command = "show ip ospf database {} {} vrf {} json".format(
+ lsatype, rid, vrf
+ )
+ else:
+ if vrf is None:
+ command = "show ip ospf database json"
+ else:
+ command = "show ip ospf database vrf {} json".format(vrf)
+
+ show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
@@ -1420,26 +1444,40 @@ def verify_ospf_database(tgen, topo, dut, input_dict, expected=True):
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
+ # import pdb; pdb.set_trace()
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
- if ospf_area in show_ospf_json["areas"]:
- if "Router Link States" in area_lsa:
- for lsa in area_lsa["Router Link States"]:
+ if ospf_area in show_ospf_json["routerLinkStates"]["areas"]:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ _advrtr = lsa.setdefault("advertisedRouter", None)
+ _options = lsa.setdefault("options", None)
+
if (
- lsa
- in show_ospf_json["areas"][ospf_area]["Router Link States"]
+ _options
+ and lsa["lsaId"]
+ == show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["linkStateId"]
+ and lsa["options"]
+ == show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["options"]
):
- logger.info(
- "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
- router,
- ospf_area,
- lsa,
- )
result = True
+ break
else:
- errormsg = (
- "[DUT: {}] OSPF LSDB area {}: expected"
- " Router LSA is {}".format(router, ospf_area, lsa)
+ errormsg = '[DUT: {}] OSPF LSA options: expected {}, Received Options are {} lsa["options"] {} OSPF LSAID: expected lsaid {}, Received lsaid {}'.format(
+ dut,
+ show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["options"],
+ _options,
+ lsa["options"],
+ show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["linkStateId"],
+ lsa["lsaId"],
)
return errormsg
if "Net Link States" in area_lsa:
@@ -2489,3 +2527,495 @@ def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
+
+
+def get_ospf_database(tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None):
+ """
+ This API is to return ospf lsa's by running
+ show ip ospf database command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `topo` : next to be verified
+ * `vrf` : vrf to be checked
+ * `lsatype` : type of lsa to be checked
+ * `rid` : router id for lsa to be checked
+ Usage
+ -----
+ input_dict = {
+ "areas": {
+ "0.0.0.0": {
+ "routerLinkStates": {
+ "100.1.1.0-100.1.1.0": {
+ "LSID": "100.1.1.0",
+ "Advertised router": "100.1.1.0",
+ "LSA Age": 130,
+ "Sequence Number": "80000006",
+ "Checksum": "a703",
+ "Router links": 3
+ }
+ },
+ "networkLinkStates": {
+ "10.0.0.2-100.1.1.1": {
+ "LSID": "10.0.0.2",
+ "Advertised router": "100.1.1.1",
+ "LSA Age": 137,
+ "Sequence Number": "80000001",
+ "Checksum": "9583"
+ }
+ },
+ },
+ }
+ }
+ result = get_ospf_database(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ result = False
+ router = dut
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ sleep(10)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("Verifying OSPF interface on router %s:", dut)
+ if not rid:
+ rid = "self-originate"
+ if lsatype:
+ if vrf is None:
+ command = "show ip ospf database {} {} json".format(lsatype, rid)
+ else:
+ command = "show ip ospf database {} {} vrf {} json".format(
+ lsatype, rid, vrf
+ )
+ else:
+ if vrf is None:
+ command = "show ip ospf database json"
+ else:
+ command = "show ip ospf database vrf {} json".format(vrf)
+
+ show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # for inter and inter lsa's
+ ospf_db_data = input_dict.setdefault("areas", None)
+ ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
+
+ if ospf_db_data:
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if "areas" in show_ospf_json and ospf_area in show_ospf_json["areas"]:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ for rtrlsa in show_ospf_json["areas"][ospf_area][
+ "routerLinkStates"
+ ]:
+ _advrtr = lsa.setdefault("advertisedRouter", None)
+ _options = lsa.setdefault("options", None)
+ if (
+ _advrtr
+ and lsa["lsaId"] == rtrlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == rtrlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if (
+ _options
+ and lsa["lsaId"] == rtrlsa["lsaId"]
+ and lsa["options"] == rtrlsa["options"]
+ ):
+ result = True
+ break
+
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Router LSA is {}\n found Router LSA: {}".format(
+ router, ospf_area, lsa, rtrlsa
+ )
+ )
+ return errormsg
+
+ if "networkLinkStates" in area_lsa:
+ for lsa in area_lsa["networkLinkStates"]:
+ for netlsa in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]
+ ):
+ if (
+ lsa["lsaId"] == netlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == netlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "summaryLinkStates" in area_lsa:
+ for lsa in area_lsa["summaryLinkStates"]:
+ for t3lsa in show_ospf_json["areas"][ospf_area][
+ "summaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t3lsa["lsaId"]
+ and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "nssaExternalLinkStates" in area_lsa:
+ for lsa in area_lsa["nssaExternalLinkStates"]:
+ for t7lsa in show_ospf_json["areas"][ospf_area][
+ "nssaExternalLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t7lsa["lsaId"]
+ and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Type7 LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "asbrSummaryLinkStates" in area_lsa:
+ for lsa in area_lsa["asbrSummaryLinkStates"]:
+ for t4lsa in show_ospf_json["areas"][ospf_area][
+ "asbrSummaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t4lsa["lsaId"]
+ and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "linkLocalOpaqueLsa" in area_lsa:
+ for lsa in area_lsa["linkLocalOpaqueLsa"]:
+ try:
+ for lnklsa in show_ospf_json["areas"][ospf_area][
+ "linkLocalOpaqueLsa"
+ ]:
+ if (
+ lsa["lsaId"] in lnklsa["lsaId"]
+ and "linkLocalOpaqueLsa"
+ in show_ospf_json["areas"][ospf_area]
+ ):
+ logger.info(
+ (
+ "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
+ "%s",
+ ospf_area,
+ lsa,
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF LSDB area: {} "
+ "expected Opaque-LSA is {}, Found is {}".format(
+ ospf_area, lsa, show_ospf_json
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+ except KeyError:
+ errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
+ return errormsg
+ else:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ for rtrlsa in show_ospf_json["routerLinkStates"]:
+ _advrtr = lsa.setdefault("advertisedRouter", None)
+ _options = lsa.setdefault("options", None)
+ _age = lsa.setdefault("lsaAge", None)
+ if (
+ _options
+ and lsa["options"]
+ == show_ospf_json["routerLinkStates"][rtrlsa][
+ ospf_area
+ ][0]["options"]
+ ):
+ result = True
+ break
+ if (
+ _age is not "get"
+ and lsa["lsaAge"]
+ == show_ospf_json["routerLinkStates"][rtrlsa][
+ ospf_area
+ ][0]["lsaAge"]
+ ):
+ result = True
+ break
+
+ if _age == "get":
+ return "{}".format(
+ show_ospf_json["routerLinkStates"][rtrlsa][
+ ospf_area
+ ][0]["lsaAge"]
+ )
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Router LSA is {}\n found Router LSA: {}".format(
+ router,
+ ospf_area,
+ lsa,
+ show_ospf_json["routerLinkStates"],
+ )
+ )
+ return errormsg
+
+ if "networkLinkStates" in area_lsa:
+ for lsa in area_lsa["networkLinkStates"]:
+ for netlsa in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]
+ ):
+ if (
+ lsa["lsaId"] == netlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == netlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "summaryLinkStates" in area_lsa:
+ for lsa in area_lsa["summaryLinkStates"]:
+ for t3lsa in show_ospf_json["areas"][ospf_area][
+ "summaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t3lsa["lsaId"]
+ and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "nssaExternalLinkStates" in area_lsa:
+ for lsa in area_lsa["nssaExternalLinkStates"]:
+ for t7lsa in show_ospf_json["areas"][ospf_area][
+ "nssaExternalLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t7lsa["lsaId"]
+ and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Type7 LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "asbrSummaryLinkStates" in area_lsa:
+ for lsa in area_lsa["asbrSummaryLinkStates"]:
+ for t4lsa in show_ospf_json["areas"][ospf_area][
+ "asbrSummaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t4lsa["lsaId"]
+ and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "linkLocalOpaqueLsa" in area_lsa:
+ for lsa in area_lsa["linkLocalOpaqueLsa"]:
+ try:
+ for lnklsa in show_ospf_json["areas"][ospf_area][
+ "linkLocalOpaqueLsa"
+ ]:
+ if (
+ lsa["lsaId"] in lnklsa["lsaId"]
+ and "linkLocalOpaqueLsa"
+ in show_ospf_json["areas"][ospf_area]
+ ):
+ logger.info(
+ (
+ "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
+ "%s",
+ ospf_area,
+ lsa,
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF LSDB area: {} "
+ "expected Opaque-LSA is {}, Found is {}".format(
+ ospf_area, lsa, show_ospf_json
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+ except KeyError:
+ errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
+ return errormsg
+
+ if ospf_external_lsa:
+ for lsa in ospf_external_lsa:
+ try:
+ for t5lsa in show_ospf_json["asExternalLinkStates"]:
+ if (
+ lsa["lsaId"] == t5lsa["lsaId"]
+ and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ except KeyError:
+ result = False
+ if result:
+ logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB : expected"
+ " External LSA is {}".format(router, lsa)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result