summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py247
1 files changed, 169 insertions, 78 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
index 9f3d4841b0..5bc9f14fea 100644
--- a/tests/topotests/lib/ospf.py
+++ b/tests/topotests/lib/ospf.py
@@ -62,7 +62,7 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
"r1": {
"ospf": {
"router_id": "22.22.22.22",
- "area": [{ "id":0.0.0.0, "type": "nssa"}]
+ "area": [{ "id": "0.0.0.0", "type": "nssa"}]
}
}
@@ -94,7 +94,7 @@ def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=Tru
return result
-def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True):
+def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True, ospf="ospf"):
"""
Helper API to create ospf global configuration.
@@ -105,6 +105,33 @@ def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True
* `router` : router to be configured.
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
+ * `ospf` : either 'ospf' or 'ospf6'
+
+ Usage
+ -----
+ input_dict = {
+ "routers": {
+ "r1": {
+ "links": {
+ "r3": {
+ "ipv6": "2013:13::1/64",
+ "ospf6": {
+ "hello_interval": 1,
+ "dead_interval": 4,
+ "network": "point-to-point"
+ }
+ }
+ },
+ "ospf6": {
+ "router_id": "1.1.1.1",
+ "neighbors": {
+ "r3": {
+ "area": "1.1.1.1"
+ }
+ }
+ }
+ }
+ }
Returns
-------
@@ -115,17 +142,17 @@ def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True
logger.debug("Entering lib API: __create_ospf_global()")
try:
- ospf_data = input_dict[router]["ospf"]
+ ospf_data = input_dict[router][ospf]
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
- config_data = ["no router ospf"]
+ config_data = ["no router {}".format(ospf)]
result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
+ tgen, router, config_data, ospf, build, load_config
)
return result
config_data = []
- cmd = "router ospf"
+ cmd = "router {}".format(ospf)
config_data.append(cmd)
@@ -133,9 +160,9 @@ def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True
router_id = ospf_data.setdefault("router_id", None)
del_router_id = ospf_data.setdefault("del_router_id", False)
if del_router_id:
- config_data.append("no ospf router-id")
+ config_data.append("no {} router-id".format(ospf))
if router_id:
- config_data.append("ospf router-id {}".format(router_id))
+ config_data.append("{} router-id {}".format(ospf, router_id))
# redistribute command
redistribute_data = ospf_data.setdefault("redistribute", {})
@@ -154,6 +181,7 @@ def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
+
# area information
area_data = ospf_data.setdefault("area", {})
if area_data:
@@ -172,9 +200,20 @@ def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
- )
+
+ # area interface information for ospf6d only
+ if ospf == "ospf6":
+ area_iface = ospf_data.setdefault("neighbors", {})
+ if area_iface:
+ for neighbor in area_iface:
+ if "area" in area_iface[neighbor]:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, area_iface[neighbor]["area"]
+ )
+ if area_iface[neighbor].setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
@@ -200,8 +239,9 @@ def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
+
result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
+ tgen, router, config_data, ospf, build, load_config
)
except InvalidCLIError:
@@ -238,7 +278,7 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
-------
True or False
"""
- logger.debug("Entering lib API: create_router_ospf()")
+ logger.debug("Entering lib API: create_router_ospf6()")
result = False
if not input_dict:
@@ -247,67 +287,15 @@ def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=Tr
topo = topo["routers"]
input_dict = deepcopy(input_dict)
for router in input_dict.keys():
- if "ospf" not in input_dict[router]:
- logger.debug("Router %s: 'ospf' not present in input_dict", router)
+ if "ospf6" not in input_dict[router]:
+ logger.debug("Router %s: 'ospf6' not present in input_dict", router)
continue
- result = __create_ospf_global(tgen, input_dict, router, build, load_config)
-
- logger.debug("Exiting lib API: create_router_ospf()")
- return result
-
-
-def __create_ospf6_global(tgen, input_dict, router, build=False, load_config=True):
- """
- Helper API to create ospf global configuration.
-
- Parameters
- ----------
- * `tgen` : Topogen object
- * `input_dict` : Input dict data, required when configuring from testcase
- * `router` : router id to be configured.
- * `build` : Only for initial setup phase this is set as True.
-
- Returns
- -------
- True or False
- """
-
- result = False
- logger.debug("Entering lib API: __create_ospf_global()")
- try:
-
- ospf_data = input_dict[router]["ospf6"]
- del_ospf_action = ospf_data.setdefault("delete", False)
- if del_ospf_action:
- config_data = ["no ipv6 router ospf"]
- result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
- )
- return result
-
- config_data = []
- cmd = "router ospf"
-
- config_data.append(cmd)
-
- router_id = ospf_data.setdefault("router_id", None)
- del_router_id = ospf_data.setdefault("del_router_id", False)
- if del_router_id:
- config_data.append("no ospf router-id")
- if router_id:
- config_data.append("ospf router-id {}".format(router_id))
-
- result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
+ result = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, "ospf6"
)
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
- logger.debug("Exiting lib API: create_ospf_global()")
+ logger.debug("Exiting lib API: create_router_ospf6()")
return result
@@ -330,7 +318,7 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
"links": {
"r2": {
"ospf": {
- "authentication": 'message-digest',
+ "authentication": "message-digest",
"authentication-key": "ospf",
"message-digest-key": "10"
}
@@ -379,6 +367,7 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
if data_ospf_area:
cmd = "ip ospf area {}".format(data_ospf_area)
config_data.append(cmd)
+
# interface ospf auth
if data_ospf_auth:
if data_ospf_auth == "null":
@@ -464,6 +453,32 @@ def clear_ospf(tgen, router):
logger.debug("Exiting lib API: clear_ospf()")
+def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
+ """
+ Redstribution of routes inside ospf.
+
+ Parameters
+ ----------
+ * `tgen`: Topogen object
+ * `topo` : json file data
+ * `dut`: device under test
+ * `route_type`: "static" or "connected" or ....
+ * `kwargs`: pass extra information (see below)
+
+ Usage
+ -----
+ redistribute_ospf(tgen, topo, "r0", "static", delete=True)
+ redistribute_ospf(tgen, topo, "r0", "static", route_map="rmap_ipv4")
+ """
+
+ ospf_red = {dut: {"ospf": {"redistribute": [{"redist_type": route_type}]}}}
+ for k, v in kwargs.items():
+ ospf_red[dut]["ospf"]["redistribute"][0][k] = v
+
+ result = create_router_ospf(tgen, topo, ospf_red)
+ assert result is True, "Testcase : Failed \n Error: {}".format(result)
+
+
################################
# Verification procs
################################
@@ -525,7 +540,7 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
- rnode, "show ip ospf neighbor all json", isjson=True
+ rnode, "show ip ospf neighbor all json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
@@ -658,6 +673,70 @@ def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
return result
+################################
+# Verification procs
+################################
+@retry(attempts=40, wait=2, return_is_str=True)
+def verify_ospf6_neighbor(tgen, topo):
+ """
+ This API is to verify ospf neighborship by running
+ show ip ospf neighbour command,
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+
+ Usage
+ -----
+ Check FULL neighbors.
+ verify_ospf_neighbor(tgen, topo)
+
+ result = verify_ospf_neighbor(tgen, topo)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: verify_ospf6_neighbor()")
+ result = False
+ for router, rnode in tgen.routers().items():
+ if "ospf6" not in topo["routers"][router]:
+ continue
+
+ logger.info("Verifying OSPF6 neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf6 neighbor json", isjson=True
+ )
+
+ if not show_ospf_json:
+ return "OSPF6 is not running"
+
+ ospf_nbr_list = topo["routers"][router]["ospf6"]["neighbors"]
+ no_of_peer = 0
+ for ospf_nbr in ospf_nbr_list:
+ ospf_nbr_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
+ for neighbor in show_ospf_json["neighbors"]:
+ if neighbor["neighborId"] == ospf_nbr_rid:
+ nh_state = neighbor["state"]
+ break
+ else:
+ return "[DUT: {}] OSPF6 peer {} missing".format(router, data_rid)
+
+ if nh_state == "Full":
+ no_of_peer += 1
+
+ if no_of_peer == len(ospf_nbr_list):
+ logger.info("[DUT: {}] OSPF6 is Converged".format(router))
+ result = True
+ else:
+ return "[DUT: {}] OSPF6 is not Converged".format(router)
+
+ logger.debug("Exiting API: verify_ospf6_neighbor()")
+ return result
+
+
@retry(attempts=21, wait=2, return_is_str=True)
def verify_ospf_rib(
tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
@@ -847,19 +926,23 @@ def verify_ospf_rib(
if "routeType" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: routeType missing"
- "for route {} in OSPF RIB \n".format(dut, st_rt)
+ " for route {} in OSPF RIB \n".format(
+ dut, st_rt
+ )
)
return errormsg
elif _rtype != ospf_rib_json[st_rt]["routeType"]:
errormsg = (
"[DUT: {}]: routeType mismatch"
- "for route {} in OSPF RIB \n".format(dut, st_rt)
+ " for route {} in OSPF RIB \n".format(
+ dut, st_rt
+ )
)
return errormsg
else:
logger.info(
- "DUT: {}]: Found routeType {}"
- "for route {}".format(dut, _rtype, st_rt)
+ "[DUT: {}]: Found routeType {}"
+ " for route {}".format(dut, _rtype, st_rt)
)
if tag:
if "tag" not in ospf_rib_json[st_rt]:
@@ -874,7 +957,11 @@ def verify_ospf_rib(
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
- " route {} in RIB \n".format(dut, _tag, st_rt,)
+ " route {} in RIB \n".format(
+ dut,
+ _tag,
+ st_rt,
+ )
)
return errormsg
@@ -891,7 +978,11 @@ def verify_ospf_rib(
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
- "route {} in RIB \n".format(dut, metric, st_rt,)
+ "route {} in RIB \n".format(
+ dut,
+ metric,
+ st_rt,
+ )
)
return errormsg