summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/pim.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/pim.py')
-rw-r--r--tests/topotests/lib/pim.py1123
1 files changed, 1084 insertions, 39 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index 03ab02460f..7a57af7dbf 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -37,6 +37,7 @@ from lib.common_config import (
retry,
run_frr_cmd,
validate_ip_address,
+ get_frr_ipv6_linklocal,
)
from lib.micronet import get_exec_path
from lib.topolog import logger
@@ -48,7 +49,7 @@ CWD = os.path.dirname(os.path.realpath(__file__))
def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True):
"""
- API to configure pim/pimv6 on router
+ API to configure pim/pim6 on router
Parameters
----------
@@ -149,7 +150,7 @@ def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict):
if "rp" in input_dict[router]["pim"]:
rp_data += pim_data["rp"]
- # PIMv6
+ # pim6
pim6_data = None
if "pim6" in input_dict[router]:
pim6_data = input_dict[router]["pim6"]
@@ -370,7 +371,7 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False):
def create_mld_config(tgen, topo, input_dict=None, build=False):
"""
- API to configure mld for PIMv6 on router
+ API to configure mld for pim6 on router
Parameters
----------
@@ -515,6 +516,19 @@ def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
config_data.append(cmd)
config_data.append("ip pim")
+ if "pim" in input_dict[router]:
+ if "disable" in input_dict[router]["pim"]:
+ enable_flag = False
+ interfaces = input_dict[router]["pim"]["disable"]
+
+ if type(interfaces) is not list:
+ interfaces = [interfaces]
+
+ for interface in interfaces:
+ cmd = "interface {}".format(interface)
+ config_data.append(cmd)
+ config_data.append("no ip pim")
+
if "pim6" in data and data["pim6"] == "enable":
# Loopback interfaces
if "type" in data and data["type"] == "loopback":
@@ -526,6 +540,19 @@ def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
config_data.append(cmd)
config_data.append("ipv6 pim")
+ if "pim6" in input_dict[router]:
+ if "disable" in input_dict[router]["pim6"]:
+ enable_flag = False
+ interfaces = input_dict[router]["pim6"]["disable"]
+
+ if type(interfaces) is not list:
+ interfaces = [interfaces]
+
+ for interface in interfaces:
+ cmd = "interface {}".format(interface)
+ config_data.append(cmd)
+ config_data.append("no ipv6 pim")
+
# pim global config
if "pim" in input_dict[router]:
pim_data = input_dict[router]["pim"]
@@ -797,6 +824,134 @@ def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected
return True
+@retry(retry_timeout=12)
+def verify_pim6_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected=True):
+ """
+ Verify all pim6 neighbors are up and running, config is verified
+ using "show ipv6 pim neighbor" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : dut info
+ * `iface` : link for which PIM nbr need to check
+ * `nbr_ip` : neighbor ip of interface
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_pim6_neighbors(tgen, topo, dut, iface=ens192, nbr_ip=20.1.1.2)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if dut is not None and dut != router:
+ continue
+
+ rnode = tgen.routers()[router]
+ show_ip_pim_neighbor_json = rnode.vtysh_cmd(
+ "show ipv6 pim neighbor json", isjson=True
+ )
+
+ for destLink, data in topo["routers"][router]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if iface is not None and iface != data["interface"]:
+ continue
+
+ if "pim6" not in data:
+ continue
+
+ if "pim6" in data and data["pim6"] == "disable":
+ continue
+
+ if "pim6" in data and data["pim6"] == "enable":
+ local_interface = data["interface"]
+
+ if "-" in destLink:
+ # Spliting and storing destRouterLink data in tempList
+ tempList = destLink.split("-")
+
+ # destRouter
+ destLink = tempList.pop(0)
+
+ # Current Router Link
+ tempList.insert(0, router)
+ curRouter = "-".join(tempList)
+ else:
+ curRouter = router
+ if destLink not in topo["routers"]:
+ continue
+ data = topo["routers"][destLink]["links"][curRouter]
+ peer_interface = data["interface"]
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim6" not in data:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM neighbor status:", router)
+
+ if "pim6" in data and data["pim6"] == "enable":
+ pim_nh_intf_ip = get_frr_ipv6_linklocal(tgen, destLink, peer_interface)
+
+ # Verifying PIM neighbor
+ if local_interface in show_ip_pim_neighbor_json:
+ if show_ip_pim_neighbor_json[local_interface]:
+ if (
+ show_ip_pim_neighbor_json[local_interface][pim_nh_intf_ip][
+ "neighbor"
+ ]
+ != pim_nh_intf_ip
+ ):
+ errormsg = (
+ "[DUT %s]: Local interface: %s, PIM6"
+ " neighbor check failed "
+ "Expected neighbor: %s, Found neighbor:"
+ " %s"
+ % (
+ router,
+ local_interface,
+ pim_nh_intf_ip,
+ show_ip_pim_neighbor_json[local_interface][
+ pim_nh_intf_ip
+ ]["neighbor"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Local interface: %s, Found"
+ " expected PIM6 neighbor %s",
+ router,
+ local_interface,
+ pim_nh_intf_ip,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Local interface: %s, and"
+ "interface ip: %s is not found in "
+ "PIM6 neighbor " % (router, local_interface, pim_nh_intf_ip)
+ )
+ return errormsg
+ else:
+ errormsg = (
+ "[DUT %s]: Local interface: %s, is not "
+ "present in PIM6 neighbor " % (router, local_interface)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
@retry(retry_timeout=40, diag_pct=0)
def verify_igmp_groups(tgen, dut, interface, group_addresses, expected=True):
"""
@@ -871,7 +1026,7 @@ def verify_igmp_groups(tgen, dut, interface, group_addresses, expected=True):
return True
-@retry(retry_timeout=60, diag_pct=0)
+@retry(retry_timeout=60, diag_pct=2)
def verify_upstream_iif(
tgen,
dut,
@@ -879,7 +1034,9 @@ def verify_upstream_iif(
src_address,
group_addresses,
joinState=None,
+ regState=None,
refCount=1,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -910,7 +1067,6 @@ def verify_upstream_iif(
-------
errormsg(str) or True
"""
-
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if dut not in tgen.routers():
@@ -919,7 +1075,8 @@ def verify_upstream_iif(
rnode = tgen.routers()[dut]
logger.info(
- "[DUT: %s]: Verifying upstream Inbound Interface" " for IGMP groups received:",
+ "[DUT: %s]: Verifying upstream Inbound Interface"
+ " for IGMP/MLD groups received:",
dut,
)
@@ -1010,14 +1167,33 @@ def verify_upstream_iif(
)
return errormsg
+ if regState:
+ if group_addr_json[src_address]["regState"] != regState:
+ errormsg = (
+ "[DUT %s]: Verifying iif "
+ "(Inbound Interface) for (%s,%s) and"
+ " rejstate :%s [FAILED]!! "
+ " Expected: %s, Found: %s"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ group_addr_json[src_address]["regState"],
+ in_interface,
+ group_addr_json[src_address]["inboundInterface"],
+ )
+ )
+ return errormsg
+
logger.info(
"[DUT %s]: Verifying iif(Inbound Interface)"
- " for (%s,%s) and joinState is %s [PASSED]!! "
+ " for (%s,%s) and joinState is %s regstate is %s [PASSED]!! "
" Found Expected: (%s)",
dut,
src_address,
grp_addr,
group_addr_json[src_address]["joinState"],
+ group_addr_json[src_address]["regState"],
group_addr_json[src_address]["inboundInterface"],
)
if not found:
@@ -1042,7 +1218,7 @@ def verify_upstream_iif(
@retry(retry_timeout=12)
def verify_join_state_and_timer(
- tgen, dut, iif, src_address, group_addresses, expected=True
+ tgen, dut, iif, src_address, group_addresses, addr_type="ipv4", expected=True
):
"""
Verify join state is updated correctly and join timer is
@@ -1178,6 +1354,7 @@ def verify_mroutes(
oil,
return_uptime=False,
mwait=0,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -1393,6 +1570,7 @@ def verify_pim_rp_info(
rp=None,
source=None,
iamrp=None,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -1578,6 +1756,7 @@ def verify_pim_state(
group_addresses,
src_address=None,
installed_fl=None,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -1697,7 +1876,7 @@ def verify_pim_state(
def get_pim_interface_traffic(tgen, input_dict):
"""
- get ip pim interface traffice by running
+ get ip pim interface traffic by running
"show ip pim interface traffic" cli
Parameters
@@ -1768,9 +1947,82 @@ def get_pim_interface_traffic(tgen, input_dict):
return output_dict
+def get_pim6_interface_traffic(tgen, input_dict):
+ """
+ get ipv6 pim interface traffic by running
+ "show ipv6 pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict(dict)`: defines DUT, what and from which interfaces
+ traffic needs to be retrieved
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "r1-r0-eth0": {
+ "helloRx": 0,
+ "helloTx": 1,
+ "joinRx": 0,
+ "joinTx": 0
+ }
+ }
+ }
+
+ result = get_pim_interface_traffic(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ output_dict = {}
+ for dut in input_dict.keys():
+ if dut not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying pim interface traffic", dut)
+
+ def show_pim_intf_traffic(rnode, dut, input_dict, output_dict):
+ show_pim_intf_traffic_json = run_frr_cmd(
+ rnode, "show ipv6 pim interface traffic json", isjson=True
+ )
+
+ output_dict[dut] = {}
+ for intf, data in input_dict[dut].items():
+ interface_json = show_pim_intf_traffic_json[intf]
+ for state in data:
+
+ # Verify Tx/Rx
+ if state in interface_json:
+ output_dict[dut][state] = interface_json[state]
+ else:
+ errormsg = (
+ "[DUT %s]: %s is not present"
+ "for interface %s [FAILED]!! " % (dut, state, intf)
+ )
+ return errormsg
+ return None
+
+ test_func = functools.partial(
+ show_pim_intf_traffic, rnode, dut, input_dict, output_dict
+ )
+ (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
+ if not result:
+ return out
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return output_dict
+
+
@retry(retry_timeout=40, diag_pct=0)
def verify_pim_interface(
- tgen, topo, dut, interface=None, interface_ip=None, expected=True
+ tgen, topo, dut, interface=None, interface_ip=None, addr_type="ipv4", expected=True
):
"""
Verify all PIM interface are up and running, config is verified
@@ -1803,29 +2055,48 @@ def verify_pim_interface(
logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
rnode = tgen.routers()[dut]
- show_ip_pim_interface_json = rnode.vtysh_cmd(
- "show ip pim interface json", isjson=True
+
+ if addr_type == "ipv4":
+ addr_cmd = "ip"
+ pim_cmd = "pim"
+ elif addr_type == "ipv6":
+ addr_cmd = "ipv6"
+ pim_cmd = "pim6"
+ show_pim_interface_json = rnode.vtysh_cmd(
+ "show {} pim interface json".format(addr_cmd), isjson=True
)
- logger.info("show_ip_pim_interface_json: \n %s", show_ip_pim_interface_json)
+ logger.info("show_pim_interface_json: \n %s", show_pim_interface_json)
if interface_ip:
- if interface in show_ip_pim_interface_json:
- pim_intf_json = show_ip_pim_interface_json[interface]
+ if interface in show_pim_interface_json:
+ pim_intf_json = show_pim_interface_json[interface]
if pim_intf_json["address"] != interface_ip:
errormsg = (
- "[DUT %s]: PIM interface "
- "ip is not correct "
+ "[DUT %s]: %s interface "
+ "%s is not correct "
"[FAILED]!! Expected : %s, Found : %s"
- % (dut, pim_intf_json["address"], interface_ip)
+ % (
+ dut,
+ pim_cmd,
+ addr_cmd,
+ pim_intf_json["address"],
+ interface_ip,
+ )
)
return errormsg
else:
logger.info(
- "[DUT %s]: PIM interface "
- "ip is correct "
+ "[DUT %s]: %s interface "
+ "%s is correct "
"[Passed]!! Expected : %s, Found : %s"
- % (dut, pim_intf_json["address"], interface_ip)
+ % (
+ dut,
+ pim_cmd,
+ addr_cmd,
+ pim_intf_json["address"],
+ interface_ip,
+ )
)
return True
else:
@@ -1833,17 +2104,17 @@ def verify_pim_interface(
if "type" in data and data["type"] == "loopback":
continue
- if "pim" in data and data["pim"] == "enable":
+ if pim_cmd in data and data[pim_cmd] == "enable":
pim_interface = data["interface"]
- pim_intf_ip = data["ipv4"].split("/")[0]
+ pim_intf_ip = data[addr_type].split("/")[0]
- if pim_interface in show_ip_pim_interface_json:
- pim_intf_json = show_ip_pim_interface_json[pim_interface]
+ if pim_interface in show_pim_interface_json:
+ pim_intf_json = show_pim_interface_json[pim_interface]
else:
errormsg = (
- "[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, not Found"
- % (dut, pim_interface, pim_intf_ip)
+ "[DUT %s]: %s interface: %s "
+ "PIM interface %s: %s, not Found"
+ % (dut, pim_cmd, pim_interface, addr_cmd, pim_intf_ip)
)
return errormsg
@@ -1853,12 +2124,14 @@ def verify_pim_interface(
and pim_intf_json["state"] != "up"
):
errormsg = (
- "[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, status check "
+ "[DUT %s]: %s interface: %s "
+ "PIM interface %s: %s, status check "
"[FAILED]!! Expected : %s, Found : %s"
% (
dut,
+ pim_cmd,
pim_interface,
+ addr_cmd,
pim_intf_ip,
pim_interface,
pim_intf_json["state"],
@@ -1867,11 +2140,13 @@ def verify_pim_interface(
return errormsg
logger.info(
- "[DUT %s]: PIM interface: %s, "
- "interface ip: %s, status: %s"
+ "[DUT %s]: %s interface: %s, "
+ "interface %s: %s, status: %s"
" [PASSED]!!",
dut,
+ pim_cmd,
pim_interface,
+ addr_cmd,
pim_intf_ip,
pim_intf_json["state"],
)
@@ -1882,8 +2157,8 @@ def verify_pim_interface(
def clear_pim_interface_traffic(tgen, topo):
"""
- Clear ip/ipv6 pim interface traffice by running
- "clear ip/ipv6 pim interface traffic" cli
+ Clear ip pim interface traffic by running
+ "clear ip pim interface traffic" cli
Parameters
----------
@@ -1914,6 +2189,74 @@ def clear_pim_interface_traffic(tgen, topo):
return True
+def clear_pim6_interface_traffic(tgen, topo):
+ """
+ Clear ipv6 pim interface traffic by running
+ "clear ipv6 pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ Usage
+ -----
+
+ result = clear_pim6_interface_traffic(tgen, topo)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in tgen.routers():
+ if "pim" not in topo["routers"][dut]:
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Clearing pim6 interface traffic", dut)
+ result = run_frr_cmd(rnode, "clear ipv6 pim interface traffic")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+def clear_pim6_interfaces(tgen, topo):
+ """
+ Clear ipv6 pim interface by running
+ "clear ipv6 pim interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ Usage
+ -----
+
+ result = clear_pim6_interfaces(tgen, topo)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in tgen.routers():
+ if "pim" not in topo["routers"][dut]:
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Clearing pim6 interfaces", dut)
+ result = run_frr_cmd(rnode, "clear ipv6 pim interface")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
def clear_pim_interfaces(tgen, dut):
"""
Clear ip/ipv6 pim interface by running
@@ -1961,8 +2304,8 @@ def clear_pim_interfaces(tgen, dut):
# Waiting for maximum 60 sec
fail_intf = []
for retry in range(1, 13):
- logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
sleep(5)
+ logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
run_json_after = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True)
found = True
for pim_intf in nh_before_clear.keys():
@@ -2212,6 +2555,35 @@ def clear_mroute(tgen, dut=None):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+def clear_pim6_mroute(tgen, dut=None):
+ """
+ Clear ipv6 mroute by running "clear ipv6 mroute" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test, default None
+
+ Usage
+ -----
+ clear_mroute(tgen, dut)
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for router, rnode in router_list.items():
+ if dut is not None and router != dut:
+ continue
+
+ logger.debug("[DUT: %s]: Clearing ipv6 mroute", router)
+ rnode.vtysh_cmd("clear ipv6 mroute")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
def reconfig_interfaces(tgen, topo, senderRouter, receiverRouter, packet=None):
"""
Configure interface ip for sender and receiver routers
@@ -2812,7 +3184,14 @@ def enable_disable_pim_bsm(tgen, router, intf, enable=True):
@retry(retry_timeout=60, diag_pct=0)
def verify_pim_join(
- tgen, topo, dut, interface, group_addresses, src_address=None, expected=True
+ tgen,
+ topo,
+ dut,
+ interface,
+ group_addresses,
+ src_address=None,
+ addr_type="ipv4",
+ expected=True,
):
"""
Verify ip/ipv6 pim join by running "show ip/ipv6 pim join" cli
@@ -2846,11 +3225,22 @@ def verify_pim_join(
rnode = tgen.routers()[dut]
logger.info("[DUT: %s]: Verifying pim join", dut)
- show_pim_join_json = run_frr_cmd(rnode, "show ip pim join json", isjson=True)
if type(group_addresses) is not list:
group_addresses = [group_addresses]
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ show_pim_join_json = run_frr_cmd(
+ rnode, "show {} pim join json".format(ip_cmd), isjson=True
+ )
+
for grp_addr in group_addresses:
# Verify if IGMP is enabled in DUT
if "igmp" not in topo["routers"][dut]:
@@ -3660,7 +4050,7 @@ def verify_multicast_flag_state(
@retry(retry_timeout=40, diag_pct=0)
-def verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip, expected=True):
+def verify_igmp_interface(tgen, dut, igmp_iface, interface_ip, expected=True):
"""
Verify all IGMP interface are up and running, config is verified
using "show ip igmp interface" cli
@@ -3884,7 +4274,7 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""
- Verify ip pim interface traffice by running
+ Verify ip pim interface traffic by running
"show ip pim interface traffic" cli
Parameters
@@ -3950,6 +4340,661 @@ def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type=
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True if return_stats == False else output_dict
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_mld_groups(tgen, dut, interface, group_addresses, expected=True):
+ """
+ Verify IGMP groups are received from an intended interface
+ by running "show ip mld groups" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which MLD groups would be received
+ * `group_addresses`: MLD group address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "ffaa::1"
+ result = verify_mld_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying mld groups received:", dut)
+ show_mld_json = run_frr_cmd(rnode, "show ipv6 mld groups json", isjson=True)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface in show_mld_json:
+ show_mld_json = show_mld_json[interface]["groups"]
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying MLD group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ found = False
+ for grp_addr in group_addresses:
+ for index in show_mld_json:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if found is not True:
+ errormsg = (
+ "[DUT %s]: Verifying MLD group received"
+ " from interface %s [FAILED]!! "
+ " Expected not found: %s" % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying MLD group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_mld_interface(tgen, dut, mld_iface, interface_ip, expected=True):
+ """
+ Verify all IGMP interface are up and running, config is verified
+ using "show ip mld interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `mld_iface` : interface name
+ * `interface_ip` : interface ip address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_mld_interface(tgen, topo, dut, mld_iface, interface_ip)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if router != dut:
+ continue
+
+ logger.info("[DUT: %s]: Verifying MLD interface status:", dut)
+
+ rnode = tgen.routers()[dut]
+ show_mld_interface_json = run_frr_cmd(
+ rnode, "show ipv6 mld interface json", isjson=True
+ )
+
+ if mld_iface in show_mld_interface_json:
+ mld_intf_json = show_mld_interface_json[mld_iface]
+ # Verifying igmp interface
+ if mld_intf_json["address"] != interface_ip:
+ errormsg = (
+ "[DUT %s]: igmp interface ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, mld_intf_json["address"], interface_ip)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: igmp interface: %s, " "interface ip: %s" " [PASSED]!!",
+ dut,
+ mld_iface,
+ interface_ip,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: igmp interface: %s "
+ "igmp interface ip: %s, is not present "
+ % (dut, mld_iface, interface_ip)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_mld_config(tgen, input_dict, stats_return=False, expected=True):
+ """
+ Verify mld interface details, verifying following configs:
+ timerQueryInterval
+ timerQueryResponseIntervalMsec
+ lastMemberQueryCount
+ timerLastMemberQueryMsec
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict` : Input dict data, required to verify
+ timer
+ * `stats_return`: If user wants API to return statistics
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict ={
+ "l1": {
+ "mld": {
+ "interfaces": {
+ "l1-i1-eth1": {
+ "mld": {
+ "query": {
+ "query-interval" : 200,
+ "query-max-response-time" : 100
+ },
+ "statistics": {
+ "queryV2" : 2,
+ "reportV2" : 1
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_mld_config(tgen, input_dict, stats_return)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
+
+ statistics = False
+ report = False
+ if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
+ statistics = True
+ cmd = "show ipv6 mld statistics"
+ else:
+ cmd = "show ipv6 mld"
+
+ logger.info("[DUT: %s]: Verifying MLD interface %s detail:", dut, interface)
+
+ if statistics:
+ if (
+ "report"
+ in input_dict[dut]["mld"]["interfaces"][interface]["mld"][
+ "statistics"
+ ]
+ ):
+ report = True
+
+ if statistics and report:
+ show_ipv6_mld_intf_json = run_frr_cmd(
+ rnode, "{} json".format(cmd), isjson=True
+ )
+ intf_detail_json = show_ipv6_mld_intf_json["global"]
+ else:
+ show_ipv6_mld_intf_json = run_frr_cmd(
+ rnode, "{} interface {} json".format(cmd, interface), isjson=True
+ )
+
+ if not report:
+ if interface not in show_ipv6_mld_intf_json:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " is not present in CLI output "
+ "[FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ else:
+ intf_detail_json = show_ipv6_mld_intf_json[interface]
+
+ if stats_return:
+ mld_stats = {}
+
+ if "statistics" in data["mld"]:
+ if stats_return:
+ mld_stats["statistics"] = {}
+ for query, value in data["mld"]["statistics"].items():
+ if query == "queryV1":
+ # Verifying IGMP interface queryV2 statistics
+ if stats_return:
+ mld_stats["statistics"][query] = intf_detail_json["queryV1"]
+
+ else:
+ if intf_detail_json["queryV1"] != value:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " queryV1 statistics verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["queryV1"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "queryV1 statistics is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if query == "reportV1":
+ # Verifying IGMP interface timerV2 statistics
+ if stats_return:
+ mld_stats["statistics"][query] = intf_detail_json[
+ "reportV1"
+ ]
+
+ else:
+ if intf_detail_json["reportV1"] <= value:
+ errormsg = (
+ "[DUT %s]: MLD reportV1 "
+ "statistics verification "
+ "[FAILED]!! Expected : %s "
+ "or more, Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD reportV1 " "statistics is %s",
+ dut,
+ intf_detail_json["reportV1"],
+ )
+
+ if "query" in data["mld"]:
+ for query, value in data["mld"]["query"].items():
+ if query == "query-interval":
+ # Verifying IGMP interface query interval timer
+ if intf_detail_json["timerQueryInterval"] != value:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " query-interval verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["timerQueryInterval"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s " "query-interval is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if query == "query-max-response-time":
+ # Verifying IGMP interface query max response timer
+ if (
+ intf_detail_json["timerQueryResponseIntervalMsec"]
+ != value * 100
+ ):
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ "query-max-response-time "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value * 1000,
+ intf_detail_json["timerQueryResponseIntervalMsec"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "query-max-response-time is %s ms",
+ dut,
+ interface,
+ value * 100,
+ )
+
+ if query == "last-member-query-count":
+ # Verifying IGMP interface last member query count
+ if intf_detail_json["lastMemberQueryCount"] != value:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-count "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["lastMemberQueryCount"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-count is %s ms",
+ dut,
+ interface,
+ value * 1000,
+ )
+
+ if query == "last-member-query-interval":
+ # Verifying IGMP interface last member query interval
+ if (
+ intf_detail_json["timerLastMemberQueryMsec"]
+ != value * 100 * intf_detail_json["lastMemberQueryCount"]
+ ):
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-interval "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value * 1000,
+ intf_detail_json["timerLastMemberQueryMsec"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-interval is %s ms",
+ dut,
+ interface,
+ value * intf_detail_json["lastMemberQueryCount"] * 100,
+ )
+
+ if "version" in data["mld"]:
+ # Verifying IGMP interface state is up
+ if intf_detail_json["state"] != "up":
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " state: %s verification "
+ "[FAILED]!!" % (dut, interface, intf_detail_json["state"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s " "state: %s",
+ dut,
+ interface,
+ intf_detail_json["state"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True if stats_return == False else mld_stats
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_nexthop(tgen, topo, dut, nexthop, addr_type):
+ """
+ Verify all PIM nexthop details using "show ip/ipv6 pim neighbor" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : dut info
+ * `nexthop` : nexthop ip/ipv6 address
+
+ Usage
+ -----
+ result = verify_pim_nexthop(tgen, topo, dut, nexthop)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ rnode = tgen.routers()[dut]
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ cmd = "show {} pim nexthop".format(addr_type)
+ pim_nexthop = rnode.vtysh_cmd(cmd)
+
+ if nexthop in pim_nexthop:
+ logger.info("[DUT %s]: Expected nexthop: %s, Found", dut, nexthop)
+ return True
+ else:
+ errormsg = "[DUT %s]: Nexthop not found: %s" % (dut, nexthop)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_mroute_summary(
+ tgen, dut, sg_mroute=None, starg_mroute=None, total_mroute=None, addr_type="ipv4"
+):
+ """
+ Verify ip mroute summary has correct (*,g) (s,G) and total mroutes
+ by running "show ip mroutes summary json" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `sg_mroute`: Number of installed (s,g) mroute
+ * `starg_mroute`: Number installed of (*,g) mroute
+ * `Total_mroute`: Total number of installed mroutes
+ * 'addr_type : IPv4 or IPv6 address
+ * `return_json`: Whether to return raw json data
+
+ Usage
+ -----
+ dut = "r1"
+ sg_mroute = "4000"
+ starg_mroute= "2000"
+ total_mroute = "6000"
+ addr_type=IPv4 or IPv6
+ result = verify_mroute_summary(tgen, dut, sg_mroute=None, starg_mroute=None,
+ total_mroute= None)
+ Returns
+ -------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying mroute summary", dut)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ cmd = "show {} mroute summary json".format(ip_cmd)
+ show_mroute_summary_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if starg_mroute is not None:
+ if show_mroute_summary_json["wildcardGroup"]["installed"] != starg_mroute:
+ logger.error(
+ "Number of installed starg are: %s but expected: %s",
+ show_mroute_summary_json["wildcardGroup"]["installed"],
+ starg_mroute,
+ )
+ return False
+ logger.info(
+ "Number of installed starg routes are %s",
+ show_mroute_summary_json["wildcardGroup"]["installed"],
+ )
+
+ if sg_mroute is not None:
+ if show_mroute_summary_json["sourceGroup"]["installed"] != sg_mroute:
+ logger.error(
+ "Number of installed SG routes are: %s but expected: %s",
+ show_mroute_summary_json["sourceGroup"]["installed"],
+ sg_mroute,
+ )
+ return False
+ logger.info(
+ "Number of installed SG routes are %s",
+ show_mroute_summary_json["sourceGroup"]["installed"],
+ )
+
+ if total_mroute is not None:
+ if show_mroute_summary_json["totalNumOfInstalledMroutes"] != total_mroute:
+ logger.error(
+ "Total number of installed mroutes are: %s but expected: %s",
+ show_mroute_summary_json["totalNumOfInstalledMroutes"],
+ total_mroute,
+ )
+ return False
+ logger.info(
+ "Number of installed Total mroute are %s",
+ show_mroute_summary_json["totalNumOfInstalledMroutes"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def verify_sg_traffic(tgen, dut, groups, src, addr_type="ipv4"):
+ """
+ Verify multicast traffic by running
+ "show ip mroute count json" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `groups`: igmp or mld groups where traffic needs to be verified
+
+ Usage
+ -----
+ result = verify_sg_traffic(tgen, "r1", igmp_groups, srcaddress)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying multicast " "SG traffic", dut)
+
+ if addr_type == "ipv4":
+ cmd = "show ip mroute count json"
+ elif addr_type == "ipv6":
+ cmd = "show ipv6 mroute count json"
+ # import pdb; pdb.set_trace()
+ show_mroute_sg_traffic_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if bool(show_mroute_sg_traffic_json) is False:
+ errormsg = "[DUT %s]: Json output is empty" % (dut)
+ return errormsg
+
+ before_traffic = {}
+ after_traffic = {}
+
+ for grp in groups:
+ if grp not in show_mroute_sg_traffic_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src,
+ grp,
+ )
+ if src not in show_mroute_sg_traffic_json[grp]:
+ errormsg = (
+ "[DUT %s]: Verifying source is not present in "
+ " %s [FAILED]!! " % (dut, src)
+ )
+ return errormsg
+
+ before_traffic[grp] = show_mroute_sg_traffic_json[grp][src]["packets"]
+
+ logger.info("Waiting for 10sec traffic to increament")
+ sleep(10)
+
+ show_mroute_sg_traffic_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ for grp in groups:
+ if grp not in show_mroute_sg_traffic_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src,
+ grp,
+ )
+ if src not in show_mroute_sg_traffic_json[grp]:
+ errormsg = (
+ "[DUT %s]: Verifying source is not present in "
+ " %s [FAILED]!! " % (dut, src)
+ )
+ return errormsg
+
+ after_traffic[grp] = show_mroute_sg_traffic_json[grp][src]["packets"]
+
+ for grp in groups:
+ if after_traffic[grp] < before_traffic[grp]:
+ errormsg = (
+ "[DUT %s]: Verifying igmp group %s source %s not increamenting traffic"
+ " [FAILED]!! " % (dut, grp, src)
+ )
+ return errormsg
+ else:
+ logger.info(
+ "[DUT %s]:igmp group %s source %s receiving traffic"
+ " [PASSED]!! " % (dut, grp, src)
+ )
+ result = True
+
+ return result
+
# def cleanup(self):
# super(McastTesterHelper, self).cleanup()