summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py571
1 files changed, 570 insertions, 1 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index d72d0aa223..156a5f7ea4 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -933,6 +933,16 @@ def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
)
rnode.run(cmd)
+ if vni:
+ config_data.append("vrf {}".format(vrf["name"]))
+ cmd = "vni {}".format(vni)
+ config_data.append(cmd)
+
+ if del_vni:
+ config_data.append("vrf {}".format(vrf["name"]))
+ cmd = "no vni {}".format(del_vni)
+ config_data.append(cmd)
+
result = create_common_configuration(
tgen, c_router, config_data, "vrf", build=build
)
@@ -984,6 +994,34 @@ def create_interface_in_kernel(
rnode.run(cmd)
+def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
+ """
+ Cretae interfaces in kernel for ipv4/ipv6
+ Config is done in Linux Kernel:
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut` : Device for which interfaces to be added
+ * `intf_name` : interface name
+ * `ifaceaction` : False to shutdown and True to bringup the
+ ineterface
+ """
+
+ rnode = tgen.routers()[dut]
+
+ cmd = "ip link set dev"
+ if ifaceaction:
+ action = "up"
+ cmd = "{} {} {}".format(cmd, intf_name, action)
+ else:
+ action = "down"
+ cmd = "{} {} {}".format(cmd, intf_name, action)
+
+ logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ rnode.run(cmd)
+
+
def validate_ip_address(ip_address):
"""
Validates the type of ip address
@@ -1042,7 +1080,7 @@ def check_address_types(addr_type=None):
return addr_types
if addr_type not in addr_types:
- logger.error(
+ logger.debug(
"{} not in supported/configured address types {}".format(
addr_type, addr_types
)
@@ -1732,6 +1770,7 @@ def create_route_maps(tgen, input_dict, build=False):
set_action = set_data.setdefault("set_action", None)
nexthop = set_data.setdefault("nexthop", None)
origin = set_data.setdefault("origin", None)
+ ext_comm_list = set_data.setdefault("extcommunity", {})
# Local Preference
if local_preference:
@@ -1796,6 +1835,19 @@ def create_route_maps(tgen, input_dict, build=False):
logger.error("In large_comm_list 'id' not" " provided")
return False
+ if ext_comm_list:
+ rt = ext_comm_list.setdefault("rt", None)
+ del_comm = ext_comm_list.setdefault("delete", None)
+ if rt:
+ cmd = "set extcommunity rt {}".format(rt)
+ if del_comm:
+ cmd = "{} delete".format(cmd)
+
+ rmap_data.append(cmd)
+ else:
+ logger.debug("In ext_comm_list 'rt' not" " provided")
+ return False
+
# Weight
if weight:
rmap_data.append("set weight {}".format(weight))
@@ -2151,6 +2203,243 @@ def addKernelRoute(
return True
+def configure_vxlan(tgen, input_dict):
+ """
+ Add and configure vxlan
+
+ * `tgen`: tgen onject
+ * `input_dict` : data for vxlan config
+
+ Usage:
+ ------
+ input_dict= {
+ "dcg2":{
+ "vxlan":[{
+ "vxlan_name": "vxlan75100",
+ "vxlan_id": "75100",
+ "dstport": 4789,
+ "local_addr": "120.0.0.1",
+ "learning": "no",
+ "delete": True
+ }]
+ }
+ }
+
+ configure_vxlan(tgen, input_dict)
+
+ Returns:
+ -------
+ True or errormsg
+
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ if "vxlan" in input_dict[dut]:
+ for vxlan_dict in input_dict[dut]["vxlan"]:
+ cmd = "ip link "
+
+ del_vxlan = vxlan_dict.setdefault("delete", None)
+ vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
+ vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
+ dstport = vxlan_dict.setdefault("dstport", None)
+ local_addr = vxlan_dict.setdefault("local_addr", None)
+ learning = vxlan_dict.setdefault("learning", None)
+
+ config_data = []
+ if vxlan_names and vxlan_ids:
+ for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
+ cmd = "ip link"
+
+ if del_vxlan:
+ cmd = "{} del {} type vxlan id {}".format(
+ cmd, vxlan_name, vxlan_id
+ )
+ else:
+ cmd = "{} add {} type vxlan id {}".format(
+ cmd, vxlan_name, vxlan_id
+ )
+
+ if dstport:
+ cmd = "{} dstport {}".format(cmd, dstport)
+
+ if local_addr:
+ ip_cmd = "ip addr add {} dev {}".format(
+ local_addr, vxlan_name
+ )
+ if del_vxlan:
+ ip_cmd = "ip addr del {} dev {}".format(
+ local_addr, vxlan_name
+ )
+
+ config_data.append(ip_cmd)
+
+ cmd = "{} local {}".format(cmd, local_addr)
+
+ if learning == "no":
+ cmd = "{} {} learning".format(cmd, learning)
+
+ elif learning == "yes":
+ cmd = "{} learning".format(cmd)
+
+ config_data.append(cmd)
+
+ try:
+ for _cmd in config_data:
+ logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
+ rnode.run(_cmd)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+def configure_brctl(tgen, topo, input_dict):
+ """
+ Add and configure brctl
+
+ * `tgen`: tgen onject
+ * `input_dict` : data for brctl config
+
+ Usage:
+ ------
+ input_dict= {
+ dut:{
+ "brctl": [{
+ "brctl_name": "br100",
+ "addvxlan": "vxlan75100",
+ "vrf": "RED",
+ "stp": "off"
+ }]
+ }
+ }
+
+ configure_brctl(tgen, topo, input_dict)
+
+ Returns:
+ -------
+ True or errormsg
+
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ if "brctl" in input_dict[dut]:
+ for brctl_dict in input_dict[dut]["brctl"]:
+
+ brctl_names = brctl_dict.setdefault("brctl_name", [])
+ addvxlans = brctl_dict.setdefault("addvxlan", [])
+ stp_values = brctl_dict.setdefault("stp", [])
+ vrfs = brctl_dict.setdefault("vrf", [])
+
+ ip_cmd = "ip link set"
+ for brctl_name, vxlan, vrf, stp in zip(
+ brctl_names, addvxlans, vrfs, stp_values
+ ):
+
+ ip_cmd_list = []
+ cmd = "ip link add name {} type bridge stp_state {}".format(brctl_name, stp)
+
+ logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ rnode.run(cmd)
+
+ ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
+
+ if vxlan:
+ cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
+
+ logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ rnode.run(cmd)
+
+ ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
+
+ if vrf:
+ ip_cmd_list.append(
+ "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
+ )
+
+ for intf_name, data in topo["routers"][dut]["links"].items():
+ if "vrf" not in data:
+ continue
+
+ if data["vrf"] == vrf:
+ ip_cmd_list.append(
+ "{} up dev {}".format(ip_cmd, data["interface"])
+ )
+
+ try:
+ for _ip_cmd in ip_cmd_list:
+ logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
+ rnode.run(_ip_cmd)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def configure_interface_mac(tgen, input_dict):
+ """
+ Add and configure brctl
+
+ * `tgen`: tgen onject
+ * `input_dict` : data for mac config
+
+ input_mac= {
+ "edge1":{
+ "br75100": "00:80:48:BA:d1:00,
+ "br75200": "00:80:48:BA:d1:00
+ }
+ }
+
+ configure_interface_mac(tgen, input_mac)
+
+ Returns:
+ -------
+ True or errormsg
+
+ """
+
+ router_list = tgen.routers()
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for intf, mac in input_dict[dut].items():
+ cmd = "ifconfig {} hw ether {}".format(intf, mac)
+ logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+
+ try:
+ result = rnode.run(cmd)
+ if len(result) != 0:
+ return result
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ return True
+
+
#############################################
# Verification APIs
#############################################
@@ -2875,3 +3164,283 @@ def verify_create_community_list(tgen, input_dict):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+
+
+def verify_cli_json(tgen, input_dict):
+ """
+ API to verify if JSON is available for clis
+ command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict`: CLIs for which JSON needs to be verified
+ Usage
+ -----
+ input_dict = {
+ "edge1":{
+ "cli": ["show evpn vni detail", show evpn rmac vni all]
+ }
+ }
+
+ result = verify_cli_json(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for cli in input_dict[dut]["cli"]:
+ logger.info(
+ "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
+ )
+
+ test_cli = "{} json".format(cli)
+ ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
+ if not bool(ret_json):
+ errormsg = "CLI: %s, JSON format is not available" % (cli)
+ return errormsg
+ elif "unknown" in ret_json or "Unknown" in ret_json:
+ errormsg = "CLI: %s, JSON format is not available" % (cli)
+ return errormsg
+ else:
+ logger.info(
+ "CLI : %s JSON format is available: " "\n %s", cli, ret_json
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
+def verify_evpn_vni(tgen, input_dict):
+ """
+ API to verify evpn vni details using "show evpn vni detail json"
+ command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict`: having details like - for which router, evpn details
+ needs to be verified
+ Usage
+ -----
+ input_dict = {
+ "edge1":{
+ "vni": [
+ {
+ "75100":{
+ "vrf": "RED",
+ "vxlanIntf": "vxlan75100",
+ "localVtepIp": "120.1.1.1",
+ "sviIntf": "br100"
+ }
+ }
+ ]
+ }
+ }
+
+ result = verify_evpn_vni(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
+
+ cmd = "show evpn vni detail json"
+ evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
+ if not bool(evpn_all_vni_json):
+ errormsg = "No output for '{}' cli".format(cmd)
+ return errormsg
+
+ if "vni" in input_dict[dut]:
+ for vni_dict in input_dict[dut]["vni"]:
+ found = False
+ vni = vni_dict["name"]
+ for evpn_vni_json in evpn_all_vni_json:
+ if "vni" in evpn_vni_json:
+ if evpn_vni_json["vni"] != int(vni):
+ continue
+
+ for attribute in vni_dict.keys():
+ if vni_dict[attribute] != evpn_vni_json[attribute]:
+ errormsg = (
+ "[DUT: %s] Verifying "
+ "%s for VNI: %s [FAILED]||"
+ ", EXPECTED : %s "
+ " FOUND : %s"
+ % (
+ dut,
+ attribute,
+ vni,
+ vni_dict[attribute],
+ evpn_vni_json[attribute],
+ )
+ )
+ return errormsg
+
+ else:
+ found = True
+ logger.info(
+ "[DUT: %s] Verifying"
+ " %s for VNI: %s , "
+ "Found Expected : %s ",
+ dut,
+ attribute,
+ vni,
+ evpn_vni_json[attribute],
+ )
+
+ if evpn_vni_json["state"] != "Up":
+ errormsg = (
+ "[DUT: %s] Failed: Verifying"
+ " State for VNI: %s is not Up" % (dut, vni)
+ )
+ return errormsg
+
+ else:
+ errormsg = (
+ "[DUT: %s] Failed:"
+ " VNI: %s is not present in JSON" % (dut, vni)
+ )
+ return errormsg
+
+ if found:
+ logger.info(
+ "[DUT %s]: Verifying VNI : %s "
+ "details and state is Up [PASSED]!!",
+ dut,
+ vni,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return False
+
+
+@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
+def verify_vrf_vni(tgen, input_dict):
+ """
+ API to verify vrf vni details using "show vrf vni json"
+ command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict`: having details like - for which router, evpn details
+ needs to be verified
+ Usage
+ -----
+ input_dict = {
+ "edge1":{
+ "vrfs": [
+ {
+ "RED":{
+ "vni": 75000,
+ "vxlanIntf": "vxlan75100",
+ "sviIntf": "br100",
+ "routerMac": "00:80:48:ba:d1:00",
+ "state": "Up"
+ }
+ }
+ ]
+ }
+ }
+
+ result = verify_vrf_vni(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
+
+ cmd = "show vrf vni json"
+ vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
+ if not bool(vrf_all_vni_json):
+ errormsg = "No output for '{}' cli".format(cmd)
+ return errormsg
+
+ if "vrfs" in input_dict[dut]:
+ for vrfs in input_dict[dut]["vrfs"]:
+ for vrf, vrf_dict in vrfs.items():
+ found = False
+ for vrf_vni_json in vrf_all_vni_json["vrfs"]:
+ if "vrf" in vrf_vni_json:
+ if vrf_vni_json["vrf"] != vrf:
+ continue
+
+ for attribute in vrf_dict.keys():
+ if vrf_dict[attribute] == vrf_vni_json[attribute]:
+ found = True
+ logger.info(
+ "[DUT %s]: VRF: %s, "
+ "verifying %s "
+ ", Found Expected: %s "
+ "[PASSED]!!",
+ dut,
+ vrf,
+ attribute,
+ vrf_vni_json[attribute],
+ )
+ else:
+ errormsg = (
+ "[DUT: %s] VRF: %s, "
+ "verifying %s [FAILED!!] "
+ ", EXPECTED : %s "
+ ", FOUND : %s"
+ % (
+ dut,
+ vrf,
+ attribute,
+ vrf_dict[attribute],
+ vrf_vni_json[attribute],
+ )
+ )
+ return errormsg
+
+ else:
+ errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
+ dut,
+ vrf,
+ )
+ return errormsg
+
+ if found:
+ logger.info(
+ "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
+ dut,
+ vrf,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return False