summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/topotests/lib/common_config.py160
-rw-r--r--tests/topotests/lib/pim.py1123
-rw-r--r--tests/topotests/multicast_pim6_static_rp_topo1/__init__.py0
-rw-r--r--tests/topotests/multicast_pim6_static_rp_topo1/multicast_pim6_static_rp.json (renamed from tests/topotests/multicast_pim_static_rp_topo1/multicast_pimv6_static_rp.json)0
-rwxr-xr-xtests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp1.py1321
-rwxr-xr-xtests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp2.py1324
-rwxr-xr-xtests/topotests/multicast_pim_static_rp_topo1/test_multicast_pimv6_static_rp.py414
-rw-r--r--tests/topotests/pytest.ini1
8 files changed, 3865 insertions, 478 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 5f4c280715..737226c7fe 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -3244,33 +3244,29 @@ def configure_interface_mac(tgen, input_dict):
return True
-def socat_send_igmp_join_traffic(
+def socat_send_mld_join(
tgen,
server,
protocol_option,
- igmp_groups,
+ mld_groups,
send_from_intf,
send_from_intf_ip=None,
port=12345,
reuseaddr=True,
- join=False,
- traffic=False,
):
"""
- API to send IGMP join using SOCAT tool
+ API to send MLD join using SOCAT tool
Parameters:
-----------
* `tgen` : Topogen object
* `server`: iperf server, from where IGMP join would be sent
* `protocol_option`: Protocol options, ex: UDP6-RECV
- * `igmp_groups`: IGMP group for which join has to be sent
+ * `mld_groups`: IGMP group for which join has to be sent
* `send_from_intf`: Interface from which join would be sent
* `send_from_intf_ip`: Interface IP, default is None
* `port`: Port to be used, default is 12345
* `reuseaddr`: True|False, bydefault True
- * `join`: If join needs to be sent
- * `traffic`: If traffic needs to be sent
returns:
--------
@@ -3280,36 +3276,32 @@ def socat_send_igmp_join_traffic(
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
rnode = tgen.routers()[server]
- socat_cmd = "socat -u "
+ socat_args = "socat -u "
- # UDP4/TCP4/UDP6/UDP6-RECV
+ # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
if protocol_option:
- socat_cmd += "{}".format(protocol_option)
+ socat_args += "{}".format(protocol_option)
if port:
- socat_cmd += ":{},".format(port)
+ socat_args += ":{},".format(port)
if reuseaddr:
- socat_cmd += "{},".format("reuseaddr")
+ socat_args += "{},".format("reuseaddr")
# Group address range to cover
- if igmp_groups:
- if not isinstance(igmp_groups, list):
- igmp_groups = [igmp_groups]
+ if mld_groups:
+ if not isinstance(mld_groups, list):
+ mld_groups = [mld_groups]
- for igmp_group in igmp_groups:
- if join:
- join_traffic_option = "ipv6-join-group"
- elif traffic:
- join_traffic_option = "ipv6-join-group-source"
+ for mld_group in mld_groups:
+ socat_cmd = socat_args
+ join_option = "ipv6-join-group"
if send_from_intf and not send_from_intf_ip:
- socat_cmd += "{}='[{}]:{}'".format(
- join_traffic_option, igmp_group, send_from_intf
- )
+ socat_cmd += "{}='[{}]:{}'".format(join_option, mld_group, send_from_intf)
else:
socat_cmd += "{}='[{}]:{}:[{}]'".format(
- join_traffic_option, igmp_group, send_from_intf, send_from_intf_ip
+ join_option, mld_group, send_from_intf, send_from_intf_ip
)
socat_cmd += " STDOUT"
@@ -3324,6 +3316,124 @@ def socat_send_igmp_join_traffic(
return True
+def socat_send_pim6_traffic(
+ tgen,
+ server,
+ protocol_option,
+ mld_groups,
+ send_from_intf,
+ port=12345,
+ multicast_hops=True,
+):
+ """
+ API to send pim6 data taffic using SOCAT tool
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `server`: iperf server, from where IGMP join would be sent
+ * `protocol_option`: Protocol options, ex: UDP6-RECV
+ * `mld_groups`: MLD group for which join has to be sent
+ * `send_from_intf`: Interface from which join would be sent
+ * `port`: Port to be used, default is 12345
+ * `multicast_hops`: multicast-hops count, default is 255
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ rnode = tgen.routers()[server]
+ socat_args = "socat -u STDIO "
+
+ # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
+ if protocol_option:
+ socat_args += "'{}".format(protocol_option)
+
+ # Group address range to cover
+ if mld_groups:
+ if not isinstance(mld_groups, list):
+ mld_groups = [mld_groups]
+
+ for mld_group in mld_groups:
+ socat_cmd = socat_args
+ if port:
+ socat_cmd += ":[{}]:{},".format(mld_group, port)
+
+ if send_from_intf:
+ socat_cmd += "interface={0},so-bindtodevice={0},".format(send_from_intf)
+
+ if multicast_hops:
+ socat_cmd += "multicast-hops=255'"
+
+ socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
+
+ # Run socat command to send pim6 traffic
+ logger.info(
+ "[DUT: {}]: Running command: [set +m; ( while sleep 1; do date; done ) | {}]".format(
+ server, socat_cmd
+ )
+ )
+
+ # Open a shell script file and write data to it, which will be
+ # used to send pim6 traffic continously
+ traffic_shell_script = "{}/{}/traffic.sh".format(tgen.logdir, server)
+ with open("{}".format(traffic_shell_script), "w") as taffic_sh:
+ taffic_sh.write(
+ "#!/usr/bin/env bash\n( while sleep 1; do date; done ) | {}\n".format(
+ socat_cmd
+ )
+ )
+
+ rnode.run("chmod 755 {}".format(traffic_shell_script))
+ output = rnode.run("{} &> /dev/null".format(traffic_shell_script))
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def kill_socat(tgen, dut=None, action=None):
+ """
+ Killing socat process if running for any router in topology
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `dut` : Any iperf hostname to send igmp prune
+ * `action`: to kill mld join using socat
+ to kill mld traffic using socat
+
+ Usage:
+ ------
+ kill_socat(tgen, dut ="i6", action="remove_mld_join")
+
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for router, rnode in router_list.items():
+ if dut is not None and router != dut:
+ continue
+
+ if action == "remove_mld_join":
+ cmd = "ps -ef | grep socat | grep UDP6-RECV | grep {}".format(router)
+ elif action == "remove_mld_traffic":
+ cmd = "ps -ef | grep socat | grep UDP6-SEND | grep {}".format(router)
+ else:
+ cmd = "ps -ef | grep socat".format(router)
+
+ awk_cmd = "awk -F' ' '{print $2}' | xargs kill -9 &>/dev/null &"
+ cmd = "{} | {}".format(cmd, awk_cmd)
+
+ logger.debug("[DUT: {}]: Running command: [{}]".format(router, cmd))
+ rnode.run(cmd)
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+
#############################################
# Verification APIs
#############################################
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index 03ab02460f..7a57af7dbf 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -37,6 +37,7 @@ from lib.common_config import (
retry,
run_frr_cmd,
validate_ip_address,
+ get_frr_ipv6_linklocal,
)
from lib.micronet import get_exec_path
from lib.topolog import logger
@@ -48,7 +49,7 @@ CWD = os.path.dirname(os.path.realpath(__file__))
def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True):
"""
- API to configure pim/pimv6 on router
+ API to configure pim/pim6 on router
Parameters
----------
@@ -149,7 +150,7 @@ def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict):
if "rp" in input_dict[router]["pim"]:
rp_data += pim_data["rp"]
- # PIMv6
+ # pim6
pim6_data = None
if "pim6" in input_dict[router]:
pim6_data = input_dict[router]["pim6"]
@@ -370,7 +371,7 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False):
def create_mld_config(tgen, topo, input_dict=None, build=False):
"""
- API to configure mld for PIMv6 on router
+ API to configure mld for pim6 on router
Parameters
----------
@@ -515,6 +516,19 @@ def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
config_data.append(cmd)
config_data.append("ip pim")
+ if "pim" in input_dict[router]:
+ if "disable" in input_dict[router]["pim"]:
+ enable_flag = False
+ interfaces = input_dict[router]["pim"]["disable"]
+
+ if type(interfaces) is not list:
+ interfaces = [interfaces]
+
+ for interface in interfaces:
+ cmd = "interface {}".format(interface)
+ config_data.append(cmd)
+ config_data.append("no ip pim")
+
if "pim6" in data and data["pim6"] == "enable":
# Loopback interfaces
if "type" in data and data["type"] == "loopback":
@@ -526,6 +540,19 @@ def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
config_data.append(cmd)
config_data.append("ipv6 pim")
+ if "pim6" in input_dict[router]:
+ if "disable" in input_dict[router]["pim6"]:
+ enable_flag = False
+ interfaces = input_dict[router]["pim6"]["disable"]
+
+ if type(interfaces) is not list:
+ interfaces = [interfaces]
+
+ for interface in interfaces:
+ cmd = "interface {}".format(interface)
+ config_data.append(cmd)
+ config_data.append("no ipv6 pim")
+
# pim global config
if "pim" in input_dict[router]:
pim_data = input_dict[router]["pim"]
@@ -797,6 +824,134 @@ def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected
return True
+@retry(retry_timeout=12)
+def verify_pim6_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected=True):
+ """
+ Verify all pim6 neighbors are up and running, config is verified
+ using "show ipv6 pim neighbor" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : dut info
+ * `iface` : link for which PIM nbr need to check
+ * `nbr_ip` : neighbor ip of interface
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_pim6_neighbors(tgen, topo, dut, iface=ens192, nbr_ip=20.1.1.2)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if dut is not None and dut != router:
+ continue
+
+ rnode = tgen.routers()[router]
+ show_ip_pim_neighbor_json = rnode.vtysh_cmd(
+ "show ipv6 pim neighbor json", isjson=True
+ )
+
+ for destLink, data in topo["routers"][router]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if iface is not None and iface != data["interface"]:
+ continue
+
+ if "pim6" not in data:
+ continue
+
+ if "pim6" in data and data["pim6"] == "disable":
+ continue
+
+ if "pim6" in data and data["pim6"] == "enable":
+ local_interface = data["interface"]
+
+ if "-" in destLink:
+ # Spliting and storing destRouterLink data in tempList
+ tempList = destLink.split("-")
+
+ # destRouter
+ destLink = tempList.pop(0)
+
+ # Current Router Link
+ tempList.insert(0, router)
+ curRouter = "-".join(tempList)
+ else:
+ curRouter = router
+ if destLink not in topo["routers"]:
+ continue
+ data = topo["routers"][destLink]["links"][curRouter]
+ peer_interface = data["interface"]
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim6" not in data:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM neighbor status:", router)
+
+ if "pim6" in data and data["pim6"] == "enable":
+ pim_nh_intf_ip = get_frr_ipv6_linklocal(tgen, destLink, peer_interface)
+
+ # Verifying PIM neighbor
+ if local_interface in show_ip_pim_neighbor_json:
+ if show_ip_pim_neighbor_json[local_interface]:
+ if (
+ show_ip_pim_neighbor_json[local_interface][pim_nh_intf_ip][
+ "neighbor"
+ ]
+ != pim_nh_intf_ip
+ ):
+ errormsg = (
+ "[DUT %s]: Local interface: %s, PIM6"
+ " neighbor check failed "
+ "Expected neighbor: %s, Found neighbor:"
+ " %s"
+ % (
+ router,
+ local_interface,
+ pim_nh_intf_ip,
+ show_ip_pim_neighbor_json[local_interface][
+ pim_nh_intf_ip
+ ]["neighbor"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Local interface: %s, Found"
+ " expected PIM6 neighbor %s",
+ router,
+ local_interface,
+ pim_nh_intf_ip,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Local interface: %s, and"
+ "interface ip: %s is not found in "
+ "PIM6 neighbor " % (router, local_interface, pim_nh_intf_ip)
+ )
+ return errormsg
+ else:
+ errormsg = (
+ "[DUT %s]: Local interface: %s, is not "
+ "present in PIM6 neighbor " % (router, local_interface)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
@retry(retry_timeout=40, diag_pct=0)
def verify_igmp_groups(tgen, dut, interface, group_addresses, expected=True):
"""
@@ -871,7 +1026,7 @@ def verify_igmp_groups(tgen, dut, interface, group_addresses, expected=True):
return True
-@retry(retry_timeout=60, diag_pct=0)
+@retry(retry_timeout=60, diag_pct=2)
def verify_upstream_iif(
tgen,
dut,
@@ -879,7 +1034,9 @@ def verify_upstream_iif(
src_address,
group_addresses,
joinState=None,
+ regState=None,
refCount=1,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -910,7 +1067,6 @@ def verify_upstream_iif(
-------
errormsg(str) or True
"""
-
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if dut not in tgen.routers():
@@ -919,7 +1075,8 @@ def verify_upstream_iif(
rnode = tgen.routers()[dut]
logger.info(
- "[DUT: %s]: Verifying upstream Inbound Interface" " for IGMP groups received:",
+ "[DUT: %s]: Verifying upstream Inbound Interface"
+ " for IGMP/MLD groups received:",
dut,
)
@@ -1010,14 +1167,33 @@ def verify_upstream_iif(
)
return errormsg
+ if regState:
+ if group_addr_json[src_address]["regState"] != regState:
+ errormsg = (
+ "[DUT %s]: Verifying iif "
+ "(Inbound Interface) for (%s,%s) and"
+ " rejstate :%s [FAILED]!! "
+ " Expected: %s, Found: %s"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ group_addr_json[src_address]["regState"],
+ in_interface,
+ group_addr_json[src_address]["inboundInterface"],
+ )
+ )
+ return errormsg
+
logger.info(
"[DUT %s]: Verifying iif(Inbound Interface)"
- " for (%s,%s) and joinState is %s [PASSED]!! "
+ " for (%s,%s) and joinState is %s regstate is %s [PASSED]!! "
" Found Expected: (%s)",
dut,
src_address,
grp_addr,
group_addr_json[src_address]["joinState"],
+ group_addr_json[src_address]["regState"],
group_addr_json[src_address]["inboundInterface"],
)
if not found:
@@ -1042,7 +1218,7 @@ def verify_upstream_iif(
@retry(retry_timeout=12)
def verify_join_state_and_timer(
- tgen, dut, iif, src_address, group_addresses, expected=True
+ tgen, dut, iif, src_address, group_addresses, addr_type="ipv4", expected=True
):
"""
Verify join state is updated correctly and join timer is
@@ -1178,6 +1354,7 @@ def verify_mroutes(
oil,
return_uptime=False,
mwait=0,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -1393,6 +1570,7 @@ def verify_pim_rp_info(
rp=None,
source=None,
iamrp=None,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -1578,6 +1756,7 @@ def verify_pim_state(
group_addresses,
src_address=None,
installed_fl=None,
+ addr_type="ipv4",
expected=True,
):
"""
@@ -1697,7 +1876,7 @@ def verify_pim_state(
def get_pim_interface_traffic(tgen, input_dict):
"""
- get ip pim interface traffice by running
+ get ip pim interface traffic by running
"show ip pim interface traffic" cli
Parameters
@@ -1768,9 +1947,82 @@ def get_pim_interface_traffic(tgen, input_dict):
return output_dict
+def get_pim6_interface_traffic(tgen, input_dict):
+ """
+ get ipv6 pim interface traffic by running
+ "show ipv6 pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict(dict)`: defines DUT, what and from which interfaces
+ traffic needs to be retrieved
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "r1-r0-eth0": {
+ "helloRx": 0,
+ "helloTx": 1,
+ "joinRx": 0,
+ "joinTx": 0
+ }
+ }
+ }
+
+ result = get_pim_interface_traffic(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ output_dict = {}
+ for dut in input_dict.keys():
+ if dut not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying pim interface traffic", dut)
+
+ def show_pim_intf_traffic(rnode, dut, input_dict, output_dict):
+ show_pim_intf_traffic_json = run_frr_cmd(
+ rnode, "show ipv6 pim interface traffic json", isjson=True
+ )
+
+ output_dict[dut] = {}
+ for intf, data in input_dict[dut].items():
+ interface_json = show_pim_intf_traffic_json[intf]
+ for state in data:
+
+ # Verify Tx/Rx
+ if state in interface_json:
+ output_dict[dut][state] = interface_json[state]
+ else:
+ errormsg = (
+ "[DUT %s]: %s is not present"
+ "for interface %s [FAILED]!! " % (dut, state, intf)
+ )
+ return errormsg
+ return None
+
+ test_func = functools.partial(
+ show_pim_intf_traffic, rnode, dut, input_dict, output_dict
+ )
+ (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
+ if not result:
+ return out
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return output_dict
+
+
@retry(retry_timeout=40, diag_pct=0)
def verify_pim_interface(
- tgen, topo, dut, interface=None, interface_ip=None, expected=True
+ tgen, topo, dut, interface=None, interface_ip=None, addr_type="ipv4", expected=True
):
"""
Verify all PIM interface are up and running, config is verified
@@ -1803,29 +2055,48 @@ def verify_pim_interface(
logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
rnode = tgen.routers()[dut]
- show_ip_pim_interface_json = rnode.vtysh_cmd(
- "show ip pim interface json", isjson=True
+
+ if addr_type == "ipv4":
+ addr_cmd = "ip"
+ pim_cmd = "pim"
+ elif addr_type == "ipv6":
+ addr_cmd = "ipv6"
+ pim_cmd = "pim6"
+ show_pim_interface_json = rnode.vtysh_cmd(
+ "show {} pim interface json".format(addr_cmd), isjson=True
)
- logger.info("show_ip_pim_interface_json: \n %s", show_ip_pim_interface_json)
+ logger.info("show_pim_interface_json: \n %s", show_pim_interface_json)
if interface_ip:
- if interface in show_ip_pim_interface_json:
- pim_intf_json = show_ip_pim_interface_json[interface]
+ if interface in show_pim_interface_json:
+ pim_intf_json = show_pim_interface_json[interface]
if pim_intf_json["address"] != interface_ip:
errormsg = (
- "[DUT %s]: PIM interface "
- "ip is not correct "
+ "[DUT %s]: %s interface "
+ "%s is not correct "
"[FAILED]!! Expected : %s, Found : %s"
- % (dut, pim_intf_json["address"], interface_ip)
+ % (
+ dut,
+ pim_cmd,
+ addr_cmd,
+ pim_intf_json["address"],
+ interface_ip,
+ )
)
return errormsg
else:
logger.info(
- "[DUT %s]: PIM interface "
- "ip is correct "
+ "[DUT %s]: %s interface "
+ "%s is correct "
"[Passed]!! Expected : %s, Found : %s"
- % (dut, pim_intf_json["address"], interface_ip)
+ % (
+ dut,
+ pim_cmd,
+ addr_cmd,
+ pim_intf_json["address"],
+ interface_ip,
+ )
)
return True
else:
@@ -1833,17 +2104,17 @@ def verify_pim_interface(
if "type" in data and data["type"] == "loopback":
continue
- if "pim" in data and data["pim"] == "enable":
+ if pim_cmd in data and data[pim_cmd] == "enable":
pim_interface = data["interface"]
- pim_intf_ip = data["ipv4"].split("/")[0]
+ pim_intf_ip = data[addr_type].split("/")[0]
- if pim_interface in show_ip_pim_interface_json:
- pim_intf_json = show_ip_pim_interface_json[pim_interface]
+ if pim_interface in show_pim_interface_json:
+ pim_intf_json = show_pim_interface_json[pim_interface]
else:
errormsg = (
- "[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, not Found"
- % (dut, pim_interface, pim_intf_ip)
+ "[DUT %s]: %s interface: %s "
+ "PIM interface %s: %s, not Found"
+ % (dut, pim_cmd, pim_interface, addr_cmd, pim_intf_ip)
)
return errormsg
@@ -1853,12 +2124,14 @@ def verify_pim_interface(
and pim_intf_json["state"] != "up"
):
errormsg = (
- "[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, status check "
+ "[DUT %s]: %s interface: %s "
+ "PIM interface %s: %s, status check "
"[FAILED]!! Expected : %s, Found : %s"
% (
dut,
+ pim_cmd,
pim_interface,
+ addr_cmd,
pim_intf_ip,
pim_interface,
pim_intf_json["state"],
@@ -1867,11 +2140,13 @@ def verify_pim_interface(
return errormsg
logger.info(
- "[DUT %s]: PIM interface: %s, "
- "interface ip: %s, status: %s"
+ "[DUT %s]: %s interface: %s, "
+ "interface %s: %s, status: %s"
" [PASSED]!!",
dut,
+ pim_cmd,
pim_interface,
+ addr_cmd,
pim_intf_ip,
pim_intf_json["state"],
)
@@ -1882,8 +2157,8 @@ def verify_pim_interface(
def clear_pim_interface_traffic(tgen, topo):
"""
- Clear ip/ipv6 pim interface traffice by running
- "clear ip/ipv6 pim interface traffic" cli
+ Clear ip pim interface traffic by running
+ "clear ip pim interface traffic" cli
Parameters
----------
@@ -1914,6 +2189,74 @@ def clear_pim_interface_traffic(tgen, topo):
return True
+def clear_pim6_interface_traffic(tgen, topo):
+ """
+ Clear ipv6 pim interface traffic by running
+ "clear ipv6 pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ Usage
+ -----
+
+ result = clear_pim6_interface_traffic(tgen, topo)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in tgen.routers():
+ if "pim" not in topo["routers"][dut]:
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Clearing pim6 interface traffic", dut)
+ result = run_frr_cmd(rnode, "clear ipv6 pim interface traffic")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+def clear_pim6_interfaces(tgen, topo):
+ """
+ Clear ipv6 pim interface by running
+ "clear ipv6 pim interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ Usage
+ -----
+
+ result = clear_pim6_interfaces(tgen, topo)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in tgen.routers():
+ if "pim" not in topo["routers"][dut]:
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Clearing pim6 interfaces", dut)
+ result = run_frr_cmd(rnode, "clear ipv6 pim interface")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
def clear_pim_interfaces(tgen, dut):
"""
Clear ip/ipv6 pim interface by running
@@ -1961,8 +2304,8 @@ def clear_pim_interfaces(tgen, dut):
# Waiting for maximum 60 sec
fail_intf = []
for retry in range(1, 13):
- logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
sleep(5)
+ logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
run_json_after = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True)
found = True
for pim_intf in nh_before_clear.keys():
@@ -2212,6 +2555,35 @@ def clear_mroute(tgen, dut=None):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+def clear_pim6_mroute(tgen, dut=None):
+ """
+ Clear ipv6 mroute by running "clear ipv6 mroute" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test, default None
+
+ Usage
+ -----
+ clear_mroute(tgen, dut)
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for router, rnode in router_list.items():
+ if dut is not None and router != dut:
+ continue
+
+ logger.debug("[DUT: %s]: Clearing ipv6 mroute", router)
+ rnode.vtysh_cmd("clear ipv6 mroute")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
def reconfig_interfaces(tgen, topo, senderRouter, receiverRouter, packet=None):
"""
Configure interface ip for sender and receiver routers
@@ -2812,7 +3184,14 @@ def enable_disable_pim_bsm(tgen, router, intf, enable=True):
@retry(retry_timeout=60, diag_pct=0)
def verify_pim_join(
- tgen, topo, dut, interface, group_addresses, src_address=None, expected=True
+ tgen,
+ topo,
+ dut,
+ interface,
+ group_addresses,
+ src_address=None,
+ addr_type="ipv4",
+ expected=True,
):
"""
Verify ip/ipv6 pim join by running "show ip/ipv6 pim join" cli
@@ -2846,11 +3225,22 @@ def verify_pim_join(
rnode = tgen.routers()[dut]
logger.info("[DUT: %s]: Verifying pim join", dut)
- show_pim_join_json = run_frr_cmd(rnode, "show ip pim join json", isjson=True)
if type(group_addresses) is not list:
group_addresses = [group_addresses]
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ show_pim_join_json = run_frr_cmd(
+ rnode, "show {} pim join json".format(ip_cmd), isjson=True
+ )
+
for grp_addr in group_addresses:
# Verify if IGMP is enabled in DUT
if "igmp" not in topo["routers"][dut]:
@@ -3660,7 +4050,7 @@ def verify_multicast_flag_state(
@retry(retry_timeout=40, diag_pct=0)
-def verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip, expected=True):
+def verify_igmp_interface(tgen, dut, igmp_iface, interface_ip, expected=True):
"""
Verify all IGMP interface are up and running, config is verified
using "show ip igmp interface" cli
@@ -3884,7 +4274,7 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""
- Verify ip pim interface traffice by running
+ Verify ip pim interface traffic by running
"show ip pim interface traffic" cli
Parameters
@@ -3950,6 +4340,661 @@ def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type=
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True if return_stats == False else output_dict
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_mld_groups(tgen, dut, interface, group_addresses, expected=True):
+ """
+ Verify IGMP groups are received from an intended interface
+ by running "show ip mld groups" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which MLD groups would be received
+ * `group_addresses`: MLD group address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "ffaa::1"
+ result = verify_mld_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying mld groups received:", dut)
+ show_mld_json = run_frr_cmd(rnode, "show ipv6 mld groups json", isjson=True)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface in show_mld_json:
+ show_mld_json = show_mld_json[interface]["groups"]
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying MLD group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ found = False
+ for grp_addr in group_addresses:
+ for index in show_mld_json:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if found is not True:
+ errormsg = (
+ "[DUT %s]: Verifying MLD group received"
+ " from interface %s [FAILED]!! "
+ " Expected not found: %s" % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying MLD group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_mld_interface(tgen, dut, mld_iface, interface_ip, expected=True):
+ """
+ Verify all IGMP interface are up and running, config is verified
+ using "show ip mld interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `mld_iface` : interface name
+ * `interface_ip` : interface ip address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_mld_interface(tgen, topo, dut, mld_iface, interface_ip)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if router != dut:
+ continue
+
+ logger.info("[DUT: %s]: Verifying MLD interface status:", dut)
+
+ rnode = tgen.routers()[dut]
+ show_mld_interface_json = run_frr_cmd(
+ rnode, "show ipv6 mld interface json", isjson=True
+ )
+
+ if mld_iface in show_mld_interface_json:
+ mld_intf_json = show_mld_interface_json[mld_iface]
+ # Verifying igmp interface
+ if mld_intf_json["address"] != interface_ip:
+ errormsg = (
+ "[DUT %s]: igmp interface ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, mld_intf_json["address"], interface_ip)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: igmp interface: %s, " "interface ip: %s" " [PASSED]!!",
+ dut,
+ mld_iface,
+ interface_ip,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: igmp interface: %s "
+ "igmp interface ip: %s, is not present "
+ % (dut, mld_iface, interface_ip)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_mld_config(tgen, input_dict, stats_return=False, expected=True):
+ """
+ Verify mld interface details, verifying following configs:
+ timerQueryInterval
+ timerQueryResponseIntervalMsec
+ lastMemberQueryCount
+ timerLastMemberQueryMsec
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict` : Input dict data, required to verify
+ timer
+ * `stats_return`: If user wants API to return statistics
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict ={
+ "l1": {
+ "mld": {
+ "interfaces": {
+ "l1-i1-eth1": {
+ "mld": {
+ "query": {
+ "query-interval" : 200,
+ "query-max-response-time" : 100
+ },
+ "statistics": {
+ "queryV2" : 2,
+ "reportV2" : 1
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_mld_config(tgen, input_dict, stats_return)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
+
+ statistics = False
+ report = False
+ if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
+ statistics = True
+ cmd = "show ipv6 mld statistics"
+ else:
+ cmd = "show ipv6 mld"
+
+ logger.info("[DUT: %s]: Verifying MLD interface %s detail:", dut, interface)
+
+ if statistics:
+ if (
+ "report"
+ in input_dict[dut]["mld"]["interfaces"][interface]["mld"][
+ "statistics"
+ ]
+ ):
+ report = True
+
+ if statistics and report:
+ show_ipv6_mld_intf_json = run_frr_cmd(
+ rnode, "{} json".format(cmd), isjson=True
+ )
+ intf_detail_json = show_ipv6_mld_intf_json["global"]
+ else:
+ show_ipv6_mld_intf_json = run_frr_cmd(
+ rnode, "{} interface {} json".format(cmd, interface), isjson=True
+ )
+
+ if not report:
+ if interface not in show_ipv6_mld_intf_json:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " is not present in CLI output "
+ "[FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ else:
+ intf_detail_json = show_ipv6_mld_intf_json[interface]
+
+ if stats_return:
+ mld_stats = {}
+
+ if "statistics" in data["mld"]:
+ if stats_return:
+ mld_stats["statistics"] = {}
+ for query, value in data["mld"]["statistics"].items():
+ if query == "queryV1":
+ # Verifying IGMP interface queryV2 statistics
+ if stats_return:
+ mld_stats["statistics"][query] = intf_detail_json["queryV1"]
+
+ else:
+ if intf_detail_json["queryV1"] != value:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " queryV1 statistics verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["queryV1"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "queryV1 statistics is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if query == "reportV1":
+ # Verifying IGMP interface timerV2 statistics
+ if stats_return:
+ mld_stats["statistics"][query] = intf_detail_json[
+ "reportV1"
+ ]
+
+ else:
+ if intf_detail_json["reportV1"] <= value:
+ errormsg = (
+ "[DUT %s]: MLD reportV1 "
+ "statistics verification "
+ "[FAILED]!! Expected : %s "
+ "or more, Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD reportV1 " "statistics is %s",
+ dut,
+ intf_detail_json["reportV1"],
+ )
+
+ if "query" in data["mld"]:
+ for query, value in data["mld"]["query"].items():
+ if query == "query-interval":
+ # Verifying IGMP interface query interval timer
+ if intf_detail_json["timerQueryInterval"] != value:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " query-interval verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["timerQueryInterval"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s " "query-interval is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if query == "query-max-response-time":
+ # Verifying IGMP interface query max response timer
+ if (
+ intf_detail_json["timerQueryResponseIntervalMsec"]
+ != value * 100
+ ):
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ "query-max-response-time "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value * 1000,
+ intf_detail_json["timerQueryResponseIntervalMsec"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "query-max-response-time is %s ms",
+ dut,
+ interface,
+ value * 100,
+ )
+
+ if query == "last-member-query-count":
+ # Verifying IGMP interface last member query count
+ if intf_detail_json["lastMemberQueryCount"] != value:
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-count "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["lastMemberQueryCount"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-count is %s ms",
+ dut,
+ interface,
+ value * 1000,
+ )
+
+ if query == "last-member-query-interval":
+ # Verifying IGMP interface last member query interval
+ if (
+ intf_detail_json["timerLastMemberQueryMsec"]
+ != value * 100 * intf_detail_json["lastMemberQueryCount"]
+ ):
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-interval "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value * 1000,
+ intf_detail_json["timerLastMemberQueryMsec"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s "
+ "last-member-query-interval is %s ms",
+ dut,
+ interface,
+ value * intf_detail_json["lastMemberQueryCount"] * 100,
+ )
+
+ if "version" in data["mld"]:
+ # Verifying IGMP interface state is up
+ if intf_detail_json["state"] != "up":
+ errormsg = (
+ "[DUT %s]: MLD interface: %s "
+ " state: %s verification "
+ "[FAILED]!!" % (dut, interface, intf_detail_json["state"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: MLD interface: %s " "state: %s",
+ dut,
+ interface,
+ intf_detail_json["state"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True if stats_return == False else mld_stats
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_nexthop(tgen, topo, dut, nexthop, addr_type):
+ """
+ Verify all PIM nexthop details using "show ip/ipv6 pim neighbor" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : dut info
+ * `nexthop` : nexthop ip/ipv6 address
+
+ Usage
+ -----
+ result = verify_pim_nexthop(tgen, topo, dut, nexthop)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ rnode = tgen.routers()[dut]
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ cmd = "show {} pim nexthop".format(addr_type)
+ pim_nexthop = rnode.vtysh_cmd(cmd)
+
+ if nexthop in pim_nexthop:
+ logger.info("[DUT %s]: Expected nexthop: %s, Found", dut, nexthop)
+ return True
+ else:
+ errormsg = "[DUT %s]: Nexthop not found: %s" % (dut, nexthop)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_mroute_summary(
+ tgen, dut, sg_mroute=None, starg_mroute=None, total_mroute=None, addr_type="ipv4"
+):
+ """
+ Verify ip mroute summary has correct (*,g) (s,G) and total mroutes
+ by running "show ip mroutes summary json" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `sg_mroute`: Number of installed (s,g) mroute
+ * `starg_mroute`: Number installed of (*,g) mroute
+ * `Total_mroute`: Total number of installed mroutes
+ * 'addr_type : IPv4 or IPv6 address
+ * `return_json`: Whether to return raw json data
+
+ Usage
+ -----
+ dut = "r1"
+ sg_mroute = "4000"
+ starg_mroute= "2000"
+ total_mroute = "6000"
+ addr_type=IPv4 or IPv6
+ result = verify_mroute_summary(tgen, dut, sg_mroute=None, starg_mroute=None,
+ total_mroute= None)
+ Returns
+ -------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying mroute summary", dut)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ cmd = "show {} mroute summary json".format(ip_cmd)
+ show_mroute_summary_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if starg_mroute is not None:
+ if show_mroute_summary_json["wildcardGroup"]["installed"] != starg_mroute:
+ logger.error(
+ "Number of installed starg are: %s but expected: %s",
+ show_mroute_summary_json["wildcardGroup"]["installed"],
+ starg_mroute,
+ )
+ return False
+ logger.info(
+ "Number of installed starg routes are %s",
+ show_mroute_summary_json["wildcardGroup"]["installed"],
+ )
+
+ if sg_mroute is not None:
+ if show_mroute_summary_json["sourceGroup"]["installed"] != sg_mroute:
+ logger.error(
+ "Number of installed SG routes are: %s but expected: %s",
+ show_mroute_summary_json["sourceGroup"]["installed"],
+ sg_mroute,
+ )
+ return False
+ logger.info(
+ "Number of installed SG routes are %s",
+ show_mroute_summary_json["sourceGroup"]["installed"],
+ )
+
+ if total_mroute is not None:
+ if show_mroute_summary_json["totalNumOfInstalledMroutes"] != total_mroute:
+ logger.error(
+ "Total number of installed mroutes are: %s but expected: %s",
+ show_mroute_summary_json["totalNumOfInstalledMroutes"],
+ total_mroute,
+ )
+ return False
+ logger.info(
+ "Number of installed Total mroute are %s",
+ show_mroute_summary_json["totalNumOfInstalledMroutes"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def verify_sg_traffic(tgen, dut, groups, src, addr_type="ipv4"):
+ """
+ Verify multicast traffic by running
+ "show ip mroute count json" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `groups`: igmp or mld groups where traffic needs to be verified
+
+ Usage
+ -----
+ result = verify_sg_traffic(tgen, "r1", igmp_groups, srcaddress)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying multicast " "SG traffic", dut)
+
+ if addr_type == "ipv4":
+ cmd = "show ip mroute count json"
+ elif addr_type == "ipv6":
+ cmd = "show ipv6 mroute count json"
+ # import pdb; pdb.set_trace()
+ show_mroute_sg_traffic_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if bool(show_mroute_sg_traffic_json) is False:
+ errormsg = "[DUT %s]: Json output is empty" % (dut)
+ return errormsg
+
+ before_traffic = {}
+ after_traffic = {}
+
+ for grp in groups:
+ if grp not in show_mroute_sg_traffic_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src,
+ grp,
+ )
+ if src not in show_mroute_sg_traffic_json[grp]:
+ errormsg = (
+ "[DUT %s]: Verifying source is not present in "
+ " %s [FAILED]!! " % (dut, src)
+ )
+ return errormsg
+
+ before_traffic[grp] = show_mroute_sg_traffic_json[grp][src]["packets"]
+
+ logger.info("Waiting for 10sec traffic to increament")
+ sleep(10)
+
+ show_mroute_sg_traffic_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ for grp in groups:
+ if grp not in show_mroute_sg_traffic_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src,
+ grp,
+ )
+ if src not in show_mroute_sg_traffic_json[grp]:
+ errormsg = (
+ "[DUT %s]: Verifying source is not present in "
+ " %s [FAILED]!! " % (dut, src)
+ )
+ return errormsg
+
+ after_traffic[grp] = show_mroute_sg_traffic_json[grp][src]["packets"]
+
+ for grp in groups:
+ if after_traffic[grp] < before_traffic[grp]:
+ errormsg = (
+ "[DUT %s]: Verifying igmp group %s source %s not increamenting traffic"
+ " [FAILED]!! " % (dut, grp, src)
+ )
+ return errormsg
+ else:
+ logger.info(
+ "[DUT %s]:igmp group %s source %s receiving traffic"
+ " [PASSED]!! " % (dut, grp, src)
+ )
+ result = True
+
+ return result
+
# def cleanup(self):
# super(McastTesterHelper, self).cleanup()
diff --git a/tests/topotests/multicast_pim6_static_rp_topo1/__init__.py b/tests/topotests/multicast_pim6_static_rp_topo1/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/multicast_pim6_static_rp_topo1/__init__.py
diff --git a/tests/topotests/multicast_pim_static_rp_topo1/multicast_pimv6_static_rp.json b/tests/topotests/multicast_pim6_static_rp_topo1/multicast_pim6_static_rp.json
index 9edfae4a24..9edfae4a24 100644
--- a/tests/topotests/multicast_pim_static_rp_topo1/multicast_pimv6_static_rp.json
+++ b/tests/topotests/multicast_pim6_static_rp_topo1/multicast_pim6_static_rp.json
diff --git a/tests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp1.py b/tests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp1.py
new file mode 100755
index 0000000000..dd8818e92c
--- /dev/null
+++ b/tests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp1.py
@@ -0,0 +1,1321 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2022 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test Multicast basic functionality:
+
+Topology:
+
+ _______r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+ | |
+ |_____________|
+ r4
+
+Test steps
+- Create topology (setup module)
+- Bring up topology
+
+1. Verify upstream interfaces(IIF) and join state are updated
+ properly after adding and deleting the static RP
+2. Verify IIF and OIL in "show ipv6 PIM6 state" updated properly when
+ RP becomes unreachable
+3. Verify RP becomes reachable after MLD join received, PIM6 join
+ towards RP is sent immediately
+4. Verify (*,G) and (S,G) populated correctly when SPT and RPT
+ share the same path
+5. Verify OIF and RPF for (*,G) and (S,G) when static RP configure
+ in LHR router
+6. Verify OIF and RFP for (*,G) and (S,G) when static RP configure
+ in FHR router
+7. Verify (*,G) and (S,G) populated correctly when RPT and SPT path
+ are different
+8. Verify PIM6 join send towards the higher preferred RP
+9. Verify PIM6 prune send towards the lower preferred RP
+"""
+
+import os
+import sys
+import json
+import time
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+sys.path.append(os.path.join(CWD, "../lib/"))
+
+# Required to instantiate the topology builder class.
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+
+from lib.common_config import (
+ start_topology,
+ write_test_header,
+ write_test_footer,
+ reset_config_on_routers,
+ step,
+ shutdown_bringup_interface,
+ kill_router_daemons,
+ start_router_daemons,
+ create_static_routes,
+ check_router_status,
+ socat_send_mld_join,
+ socat_send_pim6_traffic,
+ kill_socat,
+ topo_daemons,
+)
+from lib.pim import (
+ create_pim_config,
+ verify_upstream_iif,
+ verify_join_state_and_timer,
+ verify_mroutes,
+ verify_pim_neighbors,
+ verify_pim_interface_traffic,
+ verify_pim_rp_info,
+ verify_pim_state,
+ clear_pim6_interface_traffic,
+ clear_pim6_mroute,
+ verify_pim6_neighbors,
+ get_pim6_interface_traffic,
+ clear_pim6_interfaces,
+ verify_mld_groups,
+)
+from lib.topolog import logger
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Global variables
+GROUP_RANGE_1 = "ff08::/64"
+GROUP_ADDRESS_1 = "ff08::1"
+GROUP_RANGE_3 = "ffaa::/64"
+GROUP_ADDRESS_3 = "ffaa::1"
+GROUP_RANGE_4 = "ff00::/8"
+GROUP_ADDRESS_4 = "ff00::1"
+STAR = "*"
+SOURCE = "Static"
+ASSERT_MSG = "Testcase {} : Failed Error: {}"
+
+pytestmark = [pytest.mark.pim6d]
+
+
+def build_topo(tgen):
+ """Build function"""
+
+ # Building topology from json file
+ build_topo_from_json(tgen, TOPO)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: %s", testsuite_run_time)
+ logger.info("=" * 40)
+
+ topology = """
+
+ _______r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+ | |
+ |_____________|
+ r4
+
+ """
+ logger.info("Master Topology: \n %s", topology)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ json_file = "{}/multicast_pim6_static_rp.json".format(CWD)
+ tgen = Topogen(json_file, mod.__name__)
+ global TOPO
+ TOPO = tgen.json_topo
+
+ # ... and here it calls Mininet initialization functions.
+
+ # get list of daemons needs to be started for this suite.
+ daemons = topo_daemons(tgen, TOPO)
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start daemons and then start routers
+ start_topology(tgen, daemons)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, TOPO)
+
+ # Verify PIM6 neighbors
+ result = verify_pim6_neighbors(tgen, TOPO)
+ assert result is True, "setup_module :Failed \n Error:" " {}".format(result)
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """Teardown the pytest environment"""
+
+ logger.info("Running teardown_module to delete topology")
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info("Testsuite end time: %s", time.asctime(time.localtime(time.time())))
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Local API
+#
+#####################################################
+
+
+def verify_state_incremented(state_before, state_after):
+ """
+ API to compare interface traffic state incrementing
+
+ Parameters
+ ----------
+ * `state_before` : State dictionary for any particular instance
+ * `state_after` : State dictionary for any particular instance
+ """
+
+ for router, state_data in state_before.items():
+ for state, value in state_data.items():
+ if state_before[router][state] >= state_after[router][state]:
+ errormsg = (
+ "[DUT: %s]: state %s value has not"
+ " incremented, Initial value: %s, "
+ "Current value: %s [FAILED!!]"
+ % (
+ router,
+ state,
+ state_before[router][state],
+ state_after[router][state],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT: %s]: State %s value is "
+ "incremented, Initial value: %s, Current value: %s"
+ " [PASSED!!]",
+ router,
+ state,
+ state_before[router][state],
+ state_after[router][state],
+ )
+
+ return True
+
+
+#####################################################
+#
+# Testcases
+#
+#####################################################
+
+
+def test_pim6_add_delete_static_RP_p0(request):
+ """
+ Verify upstream interfaces(IIF) and join state are updated
+ properly after adding and deleting the static RP
+ Verify IIF and OIL in "show ipv6 PIM6 state" updated properly when
+ RP becomes unreachable
+ Verify RP becomes reachable after MLD join received, PIM6 join
+ towards RP is sent immediately
+
+ TOPOlogy used:
+ r0------r1-----r2
+ iperf DUT RP
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Shut link b/w R1 and R3 and R1 and R4 as per testcase topology")
+ intf_r1_r3 = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ intf_r1_r4 = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ for intf in [intf_r1_r3, intf_r1_r4]:
+ shutdown_bringup_interface(tgen, "r1", intf, ifaceaction=False)
+
+ step("Enable PIM6 between r1 and r2")
+ step("Enable MLD on r1 interface and send MLD " "join {} to r1".\
+ format(GROUP_RANGE_1))
+ step("Configure r2 loopback interface as RP")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify show ipv6 mld group without any MLD join")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1, expected=False)
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: mld group present without any MLD join \n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("Verify show ipv6 PIM6 interface traffic without any mld join")
+ state_dict = {
+ "r1": {TOPO["routers"]["r1"]["links"]["r2"]["interface"]: ["pruneTx"]}
+ }
+
+ state_before = verify_pim_interface_traffic(tgen, state_dict, addr_type="ipv6")
+ assert isinstance(
+ state_before, dict
+ ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
+ tc_name, result
+ )
+
+ step("send mld join {} to R1".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_1, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify RP info")
+ dut = "r1"
+ oif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ iif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(tgen, TOPO, dut, GROUP_RANGE_1, oif, rp_address, SOURCE)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, oif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, oif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify PIM6 state")
+ result = verify_pim_state(tgen, dut, oif, iif, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify ip mroutes")
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, oif, iif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Delete RP configuration")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ "delete": True,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify RP info")
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_RANGE_1, oif, rp_address, SOURCE, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} :Failed \n " "RP: {} info is still present \n Error: {}".format(
+ tc_name, rp_address, result
+ )
+
+ step("r1: Verify upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, oif, STAR, GROUP_ADDRESS_1, expected=False)
+ assert result is not True, (
+ "Testcase {} :Failed \n "
+ "Upstream ({}, {}) is still in join state \n Error: {}".format(
+ tc_name, STAR, GROUP_ADDRESS_1, result
+ )
+ )
+
+ step("r1: Verify upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, oif, STAR, GROUP_ADDRESS_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} :Failed \n "
+ "Upstream ({}, {}) timer is still running \n Error: {}".format(
+ tc_name, STAR, GROUP_ADDRESS_1, result
+ )
+ )
+
+ step("r1: Verify PIM6 state")
+ result = verify_pim_state(tgen, dut, oif, iif, GROUP_ADDRESS_1, expected=False)
+ assert result is not True, (
+ "Testcase {} :Failed \n "
+ "PIM state for group: {} is still Active \n Error: {}".format(
+ tc_name, GROUP_ADDRESS_1, result
+ )
+ )
+
+ step("r1: Verify ip mroutes")
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, oif, iif, expected=False)
+ assert result is not True, (
+ "Testcase {} :Failed \n "
+ "mroute ({}, {}) is still present \n Error: {}".format(
+ tc_name, STAR, GROUP_ADDRESS_1, result
+ )
+ )
+
+ step("r1: Verify show ipv6 PIM6 interface traffic without any MLD join")
+ state_after = verify_pim_interface_traffic(tgen, state_dict, addr_type="ipv6")
+ assert isinstance(
+ state_after, dict
+ ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_state_incremented(state_before, state_after)
+ assert result is True, "Testcase{} : Failed Error: {}".format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_SPT_RPT_path_same_p1(request):
+ """
+ Verify (*,G) and (S,G) populated correctly when SPT and RPT
+ share the same path
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | | iperf
+ r0-----r1 r3-----r5
+
+ r1 : LHR
+ r2 : RP
+ r3 : FHR
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Shut link b/w R1->R3, R1->R4 and R3->R1, R3->R4 as per " "testcase topology")
+ intf_r1_r3 = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ intf_r1_r4 = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ intf_r3_r1 = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ intf_r3_r4 = TOPO["routers"]["r3"]["links"]["r4"]["interface"]
+ for intf in [intf_r1_r3, intf_r1_r4]:
+ shutdown_bringup_interface(tgen, "r1", intf, ifaceaction=False)
+
+ for intf in [intf_r3_r1, intf_r3_r4]:
+ shutdown_bringup_interface(tgen, "r3", intf, ifaceaction=False)
+
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("Configure RP on r2 (loopback interface) for the group range {}".\
+ format(GROUP_ADDRESS_1))
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ }
+ ]
+ }
+ }
+ }
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("Enable MLD on r1 interface and send MLD join {} to R1".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_1, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("Send multicast traffic from R5")
+ intf = TOPO["routers"]["r5"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = socat_send_pim6_traffic(tgen, "r5", "UDP6-SEND", GROUP_ADDRESS_1, intf)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r2: Verify RP info")
+ dut = "r2"
+ oif = "lo"
+ rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(tgen, TOPO, dut, GROUP_RANGE_1, oif, rp_address, SOURCE)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream IIF interface")
+ dut = "r2"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r2"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S, G) upstream join state is up and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r2"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_RP_configured_as_LHR_p1(request):
+ """
+ Verify OIF and RPF for (*,G) and (S,G) when static RP configure
+ in LHR router
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+
+ r1 : LHR/RP
+ r3 : FHR
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+
+ step("r1: Configure r1(LHR) as RP")
+ input_dict = {
+ "r1": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r1"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Shut not Shut loopback interface")
+ shutdown_bringup_interface(tgen, "r1", "lo", False)
+ shutdown_bringup_interface(tgen, "r1", "lo", True)
+
+ step("r1: Verify RP info")
+ dut = "r1"
+ iif = "lo"
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ rp_address = TOPO["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(tgen, TOPO, dut, GROUP_RANGE_1, iif, rp_address, SOURCE)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("send mld join {} to R1".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_1, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r5: Send multicast traffic for group {}".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r5"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = socat_send_pim6_traffic(tgen, "r5", "UDP6-SEND", GROUP_ADDRESS_1, intf)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S, G) upstream join state is joined and join"
+ " timer is running \n Error: {}".format(tc_name, result)
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_RP_configured_as_FHR_p1(request):
+ """
+ Verify OIF and RFP for (*,G) and (S,G) when static RP configure
+ in FHR router
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+
+ r1 : LHR
+ r3 : FHR/RP
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("r3: Configure r3(FHR) as RP")
+ input_dict = {
+ "r3": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r3"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify RP info")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ rp_address = TOPO["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(tgen, TOPO, dut, GROUP_RANGE_1, iif, rp_address, SOURCE)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("send mld join {} to R1".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_1, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r5: Send multicast traffic for group {}".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r5"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = socat_send_pim6_traffic(tgen, "r5", "UDP6-SEND", GROUP_ADDRESS_1, intf)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_SPT_RPT_path_different_p1(request):
+ """
+ Verify (*,G) and (S,G) populated correctly when RPT and SPT path
+ are different
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+
+ r1: LHR
+ r2: RP
+ r3: FHR
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("r2: Configure r2 as RP")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r2: Verify RP info")
+ dut = "r2"
+ iif = "lo"
+ rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_ADDRESS_1, iif, rp_address, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("send mld join {} to R1".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_1, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r5: Send multicast traffic for group {}".format(GROUP_ADDRESS_1))
+ intf = TOPO["routers"]["r5"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = socat_send_pim6_traffic(tgen, "r5", "UDP6-SEND", GROUP_ADDRESS_1, intf)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream IIF interface")
+ dut = "r2"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream IIF interface")
+ dut = "r2"
+ iif = TOPO["routers"]["r2"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1, joinState="NotJoined"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r2: Verify (S, G) ip mroutes")
+ oif = "none"
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_send_join_on_higher_preffered_rp_p1(request):
+ """
+ Verify PIM6 join send towards the higher preferred RP
+ Verify PIM6 prune send towards the lower preferred RP
+
+ Topology used:
+ _______r2
+ |
+ iperf |
+ r0-----r1
+ |
+ |_______r4
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM66 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("Configure RP on r2 (loopback interface) for the group range {}".\
+ format(GROUP_RANGE_4))
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_4,
+ }
+ ]
+ }
+ }
+ }
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r3 : Make all interface not reachable")
+ intf_r3_r1 = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ intf_r3_r2 = TOPO["routers"]["r3"]["links"]["r2"]["interface"]
+ intf_r3_r4 = TOPO["routers"]["r3"]["links"]["r4"]["interface"]
+ intf_r1_r3 = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ intf_r2_r3 = TOPO["routers"]["r2"]["links"]["r3"]["interface"]
+ intf_r4_r3 = TOPO["routers"]["r4"]["links"]["r3"]["interface"]
+
+ for dut, intf in zip(["r1", "r2", "r3"], [intf_r1_r3, intf_r2_r3, intf_r4_r3]):
+ shutdown_bringup_interface(tgen, dut, intf, ifaceaction=False)
+
+ for intf in [intf_r3_r1, intf_r3_r4, intf_r3_r4]:
+ shutdown_bringup_interface(tgen, "r3", intf, ifaceaction=False)
+
+ step("Verify show ipv6 PIM6 interface traffic without any mld join")
+ state_dict = {"r1": {TOPO["routers"]["r1"]["links"]["r4"]["interface"]: ["joinTx"]}}
+
+ state_before = get_pim6_interface_traffic(tgen, state_dict)
+ assert isinstance(
+ state_before, dict
+ ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
+ tc_name, result
+ )
+
+ step("r0: send mld join {} to R1".format(GROUP_ADDRESS_3))
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_3, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_3)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("Configure RP on r4 (loopback interface) for the group range " "ffaa::/128")
+ input_dict = {
+ "r4": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_3,
+ }
+ ]
+ }
+ }
+ }
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1 : Verify RP info for group {}".format(GROUP_ADDRESS_4))
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ rp_address_1 = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_ADDRESS_4, iif, rp_address_1, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1 : Verify RP info for group {}".format(GROUP_ADDRESS_3))
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ rp_address_2 = TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_ADDRESS_3, iif, rp_address_2, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1 : Verify join is sent to higher preferred RP")
+ step("r1 : Verify prune is sent to lower preferred RP")
+ state_after = get_pim6_interface_traffic(tgen, state_dict)
+ assert isinstance(
+ state_after, dict
+ ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_state_incremented(state_before, state_after)
+ assert result is True, "Testcase{} : Failed Error: {}".format(tc_name, result)
+
+ step("r1 : Verify ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_3, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1 : Verify PIM6 state")
+ result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS_3)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1 : Verify upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_3)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1 : Verify upstream join state and join timer")
+ result = verify_join_state_and_timer(tgen, dut, iif, STAR, GROUP_ADDRESS_3)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ clear_pim6_interface_traffic(tgen, TOPO)
+
+ step("r1 : Verify joinTx, pruneTx count before RP gets deleted")
+ state_dict = {
+ "r1": {
+ TOPO["routers"]["r1"]["links"]["r2"]["interface"]: ["joinTx"],
+ TOPO["routers"]["r1"]["links"]["r4"]["interface"]: ["pruneTx"],
+ }
+ }
+ state_before = get_pim6_interface_traffic(tgen, state_dict)
+ assert isinstance(
+ state_before, dict
+ ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
+ tc_name, result
+ )
+
+ step("r1 : Delete RP configuration for {}".format(GROUP_RANGE_3))
+ input_dict = {
+ "r4": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_3,
+ "delete": True,
+ }
+ ]
+ }
+ }
+ }
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1 : Verify rp-info for group {}".format(GROUP_RANGE_3))
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_RANGE_3, iif, rp_address_1, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1 : Verify rp-info for group {}".format(GROUP_RANGE_4))
+ iif = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_RANGE_4, oif, rp_address_2, SOURCE, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: rp-info is present for group {} \n Error: {}".format(tc_name,
+ GROUP_RANGE_4,
+ result)
+ )
+
+ step(
+ "r1 : Verify RPF interface updated in mroute when higher preferred"
+ "RP gets deleted"
+ )
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_3, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+ logger.info("Expected behavior: %s", result)
+
+ step(
+ "r1 : Verify IIF and OIL in show ipv6 PIM6 state updated when higher"
+ "preferred overlapping RP is deleted"
+ )
+ result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS_3)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step(
+ "r1 : Verify upstream IIF updated when higher preferred overlapping"
+ "RP deleted"
+ )
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_3)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step(
+ "r1 : Verify upstream join state and join timer updated when higher"
+ "preferred overlapping RP deleted"
+ )
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_3, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step(
+ "r1 : Verify join is sent to lower preferred RP, when higher"
+ "preferred RP gets deleted"
+ )
+ step(
+ "r1 : Verify prune is sent to higher preferred RP when higher"
+ " preferred RP gets deleted"
+ )
+ state_after = get_pim6_interface_traffic(tgen, state_dict)
+ assert isinstance(
+ state_after, dict
+ ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_state_incremented(state_before, state_after)
+ assert result is True, "Testcase{} : Failed Error: {}".format(tc_name, result)
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp2.py b/tests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp2.py
new file mode 100755
index 0000000000..f366708ece
--- /dev/null
+++ b/tests/topotests/multicast_pim6_static_rp_topo1/test_multicast_pim6_static_rp2.py
@@ -0,0 +1,1324 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2022 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test Multicast basic functionality:
+
+Topology:
+
+ _______r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+ | |
+ |_____________|
+ r4
+
+Test steps
+- Create topology (setup module)
+- Bring up topology
+
+1. Configure multiple groups (10 grps) with same RP address
+2. Verify IIF and OIL in updated in mroute when upstream interface
+ configure as RP
+3. Verify RP info and (*,G) mroute after deleting the RP and shut /
+ no shut the RPF interface.
+"""
+
+import os
+import sys
+import json
+import time
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+sys.path.append(os.path.join(CWD, "../lib/"))
+
+# Required to instantiate the topology builder class.
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+
+from lib.common_config import (
+ start_topology,
+ write_test_header,
+ write_test_footer,
+ reset_config_on_routers,
+ step,
+ shutdown_bringup_interface,
+ kill_router_daemons,
+ start_router_daemons,
+ create_static_routes,
+ check_router_status,
+ socat_send_mld_join,
+ socat_send_pim6_traffic,
+ kill_socat,
+ topo_daemons,
+)
+from lib.pim import (
+ create_pim_config,
+ verify_upstream_iif,
+ verify_join_state_and_timer,
+ verify_mroutes,
+ verify_pim_neighbors,
+ verify_pim_interface_traffic,
+ verify_pim_rp_info,
+ verify_pim_state,
+ clear_pim6_interface_traffic,
+ clear_pim6_mroute,
+ verify_pim6_neighbors,
+ get_pim6_interface_traffic,
+ clear_pim6_interfaces,
+ verify_mld_groups,
+)
+from lib.topolog import logger
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Global variables
+GROUP_RANGE_1 = "ff08::/64"
+GROUP_ADDRESS_1 = "ff08::1"
+GROUP_RANGE_3 = "ffaa::/64"
+GROUP_ADDRESS_3 = "ffaa::1"
+GROUP_RANGE_LIST_1 = [
+ "ffaa::1/128",
+ "ffaa::2/128",
+ "ffaa::3/128",
+ "ffaa::4/128",
+ "ffaa::5/128",
+]
+GROUP_RANGE_LIST_2 = [
+ "ffaa::6/128",
+ "ffaa::7/128",
+ "ffaa::8/128",
+ "ffaa::9/128",
+ "ffaa::10/128",
+]
+GROUP_ADDRESS_LIST_1 = ["ffaa::1", "ffaa::2", "ffaa::3", "ffaa::4", "ffaa::5"]
+GROUP_ADDRESS_LIST_2 = ["ffaa::6", "ffaa::7", "ffaa::8", "ffaa::9", "ffaa::10"]
+STAR = "*"
+SOURCE = "Static"
+ASSERT_MSG = "Testcase {} : Failed Error: {}"
+
+pytestmark = [pytest.mark.pim6d]
+
+
+def build_topo(tgen):
+ """Build function"""
+
+ # Building topology from json file
+ build_topo_from_json(tgen, TOPO)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: %s", testsuite_run_time)
+ logger.info("=" * 40)
+
+ topology = """
+
+ _______r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+ | |
+ |_____________|
+ r4
+
+ """
+ logger.info("Master Topology: \n %s", topology)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ json_file = "{}/multicast_pim6_static_rp.json".format(CWD)
+ tgen = Topogen(json_file, mod.__name__)
+ global TOPO
+ TOPO = tgen.json_topo
+
+ # ... and here it calls Mininet initialization functions.
+
+ # get list of daemons needs to be started for this suite.
+ daemons = topo_daemons(tgen, TOPO)
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start daemons and then start routers
+ start_topology(tgen, daemons)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, TOPO)
+
+ # Verify PIM6 neighbors
+ result = verify_pim6_neighbors(tgen, TOPO)
+ assert result is True, "setup_module :Failed \n Error:" " {}".format(result)
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """Teardown the pytest environment"""
+
+ logger.info("Running teardown_module to delete topology")
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info("Testsuite end time: %s", time.asctime(time.localtime(time.time())))
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Local API
+#
+#####################################################
+
+
+def verify_state_incremented(state_before, state_after):
+ """
+ API to compare interface traffic state incrementing
+
+ Parameters
+ ----------
+ * `state_before` : State dictionary for any particular instance
+ * `state_after` : State dictionary for any particular instance
+ """
+
+ for router, state_data in state_before.items():
+ for state, value in state_data.items():
+ if state_before[router][state] >= state_after[router][state]:
+ errormsg = (
+ "[DUT: %s]: state %s value has not"
+ " incremented, Initial value: %s, "
+ "Current value: %s [FAILED!!]"
+ % (
+ router,
+ state,
+ state_before[router][state],
+ state_after[router][state],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT: %s]: State %s value is "
+ "incremented, Initial value: %s, Current value: %s"
+ " [PASSED!!]",
+ router,
+ state,
+ state_before[router][state],
+ state_after[router][state],
+ )
+
+ return True
+
+
+#####################################################
+#
+# Testcases
+#
+#####################################################
+
+def test_pim6_multiple_groups_same_RP_address_p2(request):
+ """
+ Configure multiple groups (10 grps) with same RP address
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+
+ r1 : LHR
+ r2 : RP
+ r3 : FHR
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("r2: Configure r2 as RP")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_3,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r2: verify rp-info")
+ dut = "r2"
+ oif = "lo"
+ rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(tgen, TOPO, dut, GROUP_RANGE_3, oif, rp_address, SOURCE)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ group_address_list = GROUP_ADDRESS_LIST_1 + GROUP_ADDRESS_LIST_2
+ step("r0: Send MLD join for 10 groups")
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", group_address_list, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r5: Send multicast traffic for group {}".format(group_address_list))
+ intf = TOPO["routers"]["r5"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = socat_send_pim6_traffic(tgen, "r5", "UDP6-SEND", group_address_list, intf)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream IIF interface")
+ dut = "r2"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ group_address_list,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream IIF interface")
+ dut = "r2"
+ iif = TOPO["routers"]["r2"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(
+ tgen, dut, iif, SOURCE_ADDRESS, group_address_list, joinState="NotJoined"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ group_address_list,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r2: Verify (S, G) ip mroutes")
+ oif = "none"
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Delete RP configuration")
+ input_dict = {
+ "r1": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_3,
+ "delete": True,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Shut the interface r1-r2-eth1 from R1 to R2")
+ dut = "r1"
+ intf = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ shutdown_bringup_interface(tgen, dut, intf, False)
+
+ step("r1: No Shut the interface r1-r2-eth1 from R1 to R2")
+ shutdown_bringup_interface(tgen, dut, intf, True)
+
+ step("r1: Configure RP")
+ input_dict = {
+ "r1": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_3,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Shut the interface r1-r0-eth0 from R1 to R2")
+ intf = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ shutdown_bringup_interface(tgen, dut, intf, False)
+
+ step("r1: No Shut the interface r1-r0-eth0 from R1 to R2")
+ shutdown_bringup_interface(tgen, dut, intf, True)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream IIF interface")
+ dut = "r2"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ group_address_list,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, group_address_list, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_multiple_groups_different_RP_address_p2(request):
+ """
+ Verify IIF and OIL in updated in mroute when upstream interface
+ configure as RP
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | | iperf
+ r0-----r1-------------r3-----r5
+ | |
+ |_____________|
+ r4
+ r1 : LHR
+ r2 & r4 : RP
+ r3 : FHR
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("r2: Configure r2 as RP")
+ step("r4: Configure r4 as RP")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_LIST_1,
+ }
+ ]
+ }
+ },
+ "r4": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_LIST_2,
+ }
+ ]
+ }
+ },
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r2: Verify RP info")
+ dut = "r2"
+ oif = "lo"
+ rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_RANGE_LIST_1, oif, rp_address, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify RP info")
+ dut = "r4"
+ rp_address = TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_RANGE_LIST_2, oif, rp_address, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ group_address_list = GROUP_ADDRESS_LIST_1 + GROUP_ADDRESS_LIST_2
+ step("r0: Send MLD join for 10 groups")
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", group_address_list, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r5: Send multicast traffic for group {}".format(group_address_list))
+ intf = TOPO["routers"]["r5"]["links"]["r3"]["interface"]
+ SOURCE_ADDRESS = TOPO["routers"]["r5"]["links"]["r3"]["ipv6"].split("/")[0]
+ result = socat_send_pim6_traffic(tgen, "r5", "UDP6-SEND", group_address_list, intf)
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, group_address_list)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, group_address_list, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream IIF interface")
+ dut = "r2"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_1, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r2"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, joinState="NotJoined"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_1,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r2: Verify (S, G) ip mroutes")
+ oif = "none"
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_1,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (*, G) upstream IIF interface")
+ dut = "r4"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r4"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r4"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, joinState="NotJoined"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_2,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r4: Verify (S, G) ip mroutes")
+ oif = "none"
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_2,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("Delete RP configuration")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_LIST_1,
+ "delete": True,
+ }
+ ]
+ }
+ },
+ "r4": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_LIST_2,
+ "delete": True,
+ }
+ ]
+ }
+ },
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1, r2, r3, r4: Re-configure RP")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_LIST_1,
+ }
+ ]
+ }
+ },
+ "r4": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r4"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_LIST_2,
+ }
+ ]
+ }
+ },
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Shut/No Shut the interfacesfrom R1 to R2, R4 and R0")
+ dut = "r1"
+ intf1 = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ intf2 = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ intf3 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ for intf in [intf1, intf2, intf3]:
+ shutdown_bringup_interface(tgen, dut, intf, False)
+ shutdown_bringup_interface(tgen, dut, intf, True)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_1, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream IIF interface")
+ dut = "r2"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_1, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r2"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, joinState="NotJoined"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_1,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r2: Verify (S, G) ip mroutes")
+ oif = "none"
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_1,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream IIF interface")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (S, G) ip mroutes")
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (*, G) upstream IIF interface")
+ dut = "r4"
+ iif = "lo"
+ result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (*, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen, dut, iif, STAR, GROUP_ADDRESS_LIST_2, addr_type="ipv6"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (*, G) ip mroutes")
+ oif = TOPO["routers"]["r4"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (S, G) upstream IIF interface")
+ iif = TOPO["routers"]["r4"]["links"]["r3"]["interface"]
+ result = verify_upstream_iif(
+ tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, joinState="NotJoined"
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r4: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_2,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r4: Verify (S, G) ip mroutes")
+ oif = "none"
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream IIF interface")
+ dut = "r3"
+ iif = TOPO["routers"]["r3"]["links"]["r5"]["interface"]
+ result = verify_upstream_iif(tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r3: Verify (S, G) upstream join state and join timer")
+ result = verify_join_state_and_timer(
+ tgen,
+ dut,
+ iif,
+ SOURCE_ADDRESS,
+ GROUP_ADDRESS_LIST_2,
+ addr_type="ipv6",
+ expected=False,
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r3: Verify (S, G) ip mroutes")
+ oif = TOPO["routers"]["r3"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_pim6_delete_RP_shut_noshut_upstream_interface_p1(request):
+ """
+ Verify RP info and (*,G) mroute after deleting the RP and shut /
+ no shut the RPF interface.
+
+ Topology used:
+ ________r2_____
+ | |
+ iperf | |
+ r0-----r1-------------r3
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Creating configuration from JSON")
+ kill_socat(tgen)
+ clear_pim6_mroute(tgen)
+ clear_pim6_interface_traffic(tgen, TOPO)
+ reset_config_on_routers(tgen)
+
+ step("Enable MLD on r1 interface")
+ step("Enable the PIM6 on all the interfaces of r1, r2, r3 and r4 routers")
+ step("r2: Configure r2 as RP")
+ input_dict = {
+ "r2": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r2: verify rp-info")
+ dut = "r2"
+ oif = "lo"
+ rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ result = verify_pim_rp_info(
+ tgen, TOPO, dut, GROUP_ADDRESS_1, oif, rp_address, SOURCE
+ )
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r0: Send MLD join")
+ intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
+ intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
+ result = socat_send_mld_join(
+ tgen, "r0", "UDP6-RECV", GROUP_ADDRESS_1, intf, intf_ip
+ )
+ assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
+ step("r1: Verify MLD groups")
+ dut = "r1"
+ intf_r1_r0 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mld_groups(tgen, dut, intf_r1_r0, GROUP_ADDRESS_1)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Verify (*, G) ip mroutes created")
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r2: Verify (*, G) ip mroutes created")
+ dut = "r2"
+ iif = "lo"
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif)
+ assert result is True, ASSERT_MSG.format(tc_name, result)
+
+ step("r1: Delete RP configuration")
+ input_dict = {
+ "r1": {
+ "pim6": {
+ "rp": [
+ {
+ "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
+ "/"
+ )[0],
+ "group_addr_range": GROUP_RANGE_1,
+ "delete": True,
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, TOPO, input_dict)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("r1: Shut/No Shut the interface r1-r2-eth1/r1-r0-eth0 from R1 to R2")
+ dut = "r1"
+ intf1 = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ intf2 = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ for intf in [intf1, intf2]:
+ shutdown_bringup_interface(tgen, dut, intf, False)
+ shutdown_bringup_interface(tgen, dut, intf, True)
+
+ step("r2: Shut the RP interface lo")
+ dut = "r2"
+ intf = "lo"
+ shutdown_bringup_interface(tgen, dut, intf, False)
+
+ step("r1: Shut the interface r1-r2-eth1/r1-r3-eth2 towards RP")
+ intf3 = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
+ for intf in [intf1, intf3]:
+ shutdown_bringup_interface(tgen, dut, intf, False)
+
+ step("r1: Verify (*, G) ip mroutes cleared")
+ dut = "r1"
+ iif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
+ oif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif, expected=False)
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ step("r2: Verify (*, G) ip mroutes cleared")
+ dut = "r2"
+ iif = "lo"
+ oif = TOPO["routers"]["r2"]["links"]["r1"]["interface"]
+ result = verify_mroutes(tgen, dut, STAR, GROUP_ADDRESS_1, iif, oif, expected=False)
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r2: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format(
+ tc_name, result
+ )
+ )
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/multicast_pim_static_rp_topo1/test_multicast_pimv6_static_rp.py b/tests/topotests/multicast_pim_static_rp_topo1/test_multicast_pimv6_static_rp.py
deleted file mode 100755
index bd5473a511..0000000000
--- a/tests/topotests/multicast_pim_static_rp_topo1/test_multicast_pimv6_static_rp.py
+++ /dev/null
@@ -1,414 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Copyright (c) 2022 by VMware, Inc. ("VMware")
-# Used Copyright (c) 2018 by Network Device Education Foundation,
-# Inc. ("NetDEF") in this file.
-#
-# Permission to use, copy, modify, and/or distribute this software
-# for any purpose with or without fee is hereby granted, provided
-# that the above copyright notice and this permission notice appear
-# in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
-# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
-# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
-# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
-# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
-# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
-# OF THIS SOFTWARE.
-#
-
-"""
-Following tests are covered to test Multicast basic functionality:
-
-Topology:
-
- _______r2_____
- | |
- iperf | | iperf
- r0-----r1-------------r3-----r5
- | |
- |_____________|
- r4
-
-Test steps
-- Create topology (setup module)
-- Bring up topology
-
-TC_1 : Verify upstream interfaces(IIF) and join state are updated properly
- after adding and deleting the static RP
-TC_2 : Verify IIF and OIL in "show ip pim state" updated properly after
- adding and deleting the static RP
-TC_3: (*, G) Mroute entry are cleared when static RP gets deleted
-TC_4: Verify (*,G) prune is send towards the RP after deleting the static RP
-TC_24 : Verify (*,G) and (S,G) populated correctly when SPT and RPT share the
- same path
-"""
-
-import os
-import sys
-import json
-import time
-import pytest
-from time import sleep
-import datetime
-
-# Save the Current Working Directory to find configuration files.
-CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, "../"))
-sys.path.append(os.path.join(CWD, "../lib/"))
-
-# Required to instantiate the topology builder class.
-
-# pylint: disable=C0413
-# Import topogen and topotest helpers
-from lib.topogen import Topogen, get_topogen
-
-from lib.common_config import (
- start_topology,
- write_test_header,
- write_test_footer,
- reset_config_on_routers,
- step,
- shutdown_bringup_interface,
- kill_router_daemons,
- start_router_daemons,
- create_static_routes,
- check_router_status,
- socat_send_igmp_join_traffic,
- topo_daemons
-)
-from lib.pim import (
- create_pim_config,
- verify_igmp_groups,
- verify_upstream_iif,
- verify_join_state_and_timer,
- verify_mroutes,
- verify_pim_neighbors,
- verify_pim_interface_traffic,
- verify_pim_rp_info,
- verify_pim_state,
- clear_pim_interface_traffic,
- clear_igmp_interfaces,
- clear_pim_interfaces,
- clear_mroute,
- clear_mroute_verify,
-)
-from lib.topolog import logger
-from lib.topojson import build_topo_from_json, build_config_from_json
-
-# Global variables
-GROUP_RANGE_V6 = "ff08::/64"
-IGMP_JOIN_V6 = "ff08::1"
-STAR = "*"
-SOURCE = "Static"
-
-pytestmark = [pytest.mark.pimd]
-
-
-def build_topo(tgen):
- """Build function"""
-
- # Building topology from json file
- build_topo_from_json(tgen, TOPO)
-
-
-def setup_module(mod):
- """
- Sets up the pytest environment
-
- * `mod`: module name
- """
-
- testsuite_run_time = time.asctime(time.localtime(time.time()))
- logger.info("Testsuite start time: %s", testsuite_run_time)
- logger.info("=" * 40)
-
- topology = """
-
- _______r2_____
- | |
- iperf | | iperf
- r0-----r1-------------r3-----r5
- | |
- |_____________|
- r4
-
- """
- logger.info("Master Topology: \n %s", topology)
-
- logger.info("Running setup_module to create topology")
-
- # This function initiates the topology build with Topogen...
- json_file = "{}/multicast_pimv6_static_rp.json".format(CWD)
- tgen = Topogen(json_file, mod.__name__)
- global TOPO
- TOPO = tgen.json_topo
-
- # ... and here it calls Mininet initialization functions.
-
- # get list of daemons needs to be started for this suite.
- daemons = topo_daemons(tgen, TOPO)
-
- # Starting topology, create tmp files which are loaded to routers
- # to start daemons and then start routers
- start_topology(tgen, daemons)
-
- # Don"t run this test if we have any failure.
- if tgen.routers_have_failure():
- pytest.skip(tgen.errors)
-
- # Creating configuration from JSON
- build_config_from_json(tgen, TOPO)
-
- # Verify PIM neighbors
- result = verify_pim_neighbors(tgen, TOPO)
- assert result is True, "setup_module :Failed \n Error:" " {}".format(result)
-
- logger.info("Running setup_module() done")
-
-
-def teardown_module():
- """Teardown the pytest environment"""
-
- logger.info("Running teardown_module to delete topology")
- tgen = get_topogen()
-
- # Stop toplogy and Remove tmp files
- tgen.stop_topology()
-
- logger.info("Testsuite end time: %s", time.asctime(time.localtime(time.time())))
- logger.info("=" * 40)
-
-
-#####################################################
-#
-# Testcases
-#
-#####################################################
-
-
-def verify_state_incremented(state_before, state_after):
- """
- API to compare interface traffic state incrementing
-
- Parameters
- ----------
- * `state_before` : State dictionary for any particular instance
- * `state_after` : State dictionary for any particular instance
- """
-
- for router, state_data in state_before.items():
- for state, value in state_data.items():
- if state_before[router][state] >= state_after[router][state]:
- errormsg = (
- "[DUT: %s]: state %s value has not"
- " incremented, Initial value: %s, "
- "Current value: %s [FAILED!!]"
- % (
- router,
- state,
- state_before[router][state],
- state_after[router][state],
- )
- )
- return errormsg
-
- logger.info(
- "[DUT: %s]: State %s value is "
- "incremented, Initial value: %s, Current value: %s"
- " [PASSED!!]",
- router,
- state,
- state_before[router][state],
- state_after[router][state],
- )
-
- return True
-
-
-#####################################################
-
-def test_pimv6_add_delete_static_RP_p0(request):
- """
- TC_1: Verify upstream interfaces(IIF) and join state are updated
- properly after adding and deleting the static RP
- TC_2: Verify IIF and OIL in "show ip pim state" updated properly
- after adding and deleting the static RP
- TC_3: (*, G) Mroute entry are cleared when static RP gets deleted
- TC_4: Verify (*,G) prune is send towards the RP after deleting the
- static RP
-
- TOPOlogy used:
- r0------r1-----r2
- iperf DUT RP
- """
-
- tgen = get_topogen()
- tc_name = request.node.name
- write_test_header(tc_name)
-
- # Don"t run this test if we have any failure.
- if tgen.routers_have_failure():
- check_router_status(tgen)
-
- step("Shut link b/w R1 and R3 and R1 and R4 as per tescase topology")
- intf_r1_r3 = TOPO["routers"]["r1"]["links"]["r3"]["interface"]
- intf_r1_r4 = TOPO["routers"]["r1"]["links"]["r4"]["interface"]
- for intf in [intf_r1_r3, intf_r1_r4]:
- shutdown_bringup_interface(tgen, "r1", intf, ifaceaction=False)
-
- step("Enable PIM between r1 and r2")
- step("Enable MLD on r1 interface and send IGMP " "join (FF08::1) to r1")
- step("Configure r2 loopback interface as RP")
- input_dict = {
- "r2": {
- "pim6": {
- "rp": [
- {
- "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
- "/"
- )[0],
- "group_addr_range": GROUP_RANGE_V6,
- }
- ]
- }
- }
- }
-
- result = create_pim_config(tgen, TOPO, input_dict)
- assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
-
- step("Verify show ip pim interface traffic without any mld join")
- state_dict = {
- "r1": {TOPO["routers"]["r1"]["links"]["r2"]["interface"]: ["pruneTx"]}
- }
-
- state_before = verify_pim_interface_traffic(tgen, state_dict, addr_type="ipv6")
- assert isinstance(
- state_before, dict
- ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
- tc_name, result
- )
-
- step("send mld join (FF08::1) to R1")
- intf = TOPO["routers"]["r0"]["links"]["r1"]["interface"]
- intf_ip = TOPO["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0]
- result = socat_send_igmp_join_traffic(
- tgen, "r0", "UDP6-RECV", IGMP_JOIN_V6, intf, intf_ip, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- step("r1: Verify RP info")
- dut = "r1"
- oif = TOPO["routers"]["r1"]["links"]["r2"]["interface"]
- iif = TOPO["routers"]["r1"]["links"]["r0"]["interface"]
- rp_address = TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
- result = verify_pim_rp_info(
- tgen, TOPO, dut, GROUP_RANGE_V6, oif, rp_address, SOURCE
- )
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- step("r1: Verify upstream IIF interface")
- result = verify_upstream_iif(tgen, dut, oif, STAR, IGMP_JOIN_V6)
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- step("r1: Verify upstream join state and join timer")
- result = verify_join_state_and_timer(tgen, dut, oif, STAR, IGMP_JOIN_V6)
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- step("r1: Verify PIM state")
- result = verify_pim_state(tgen, dut, oif, iif, IGMP_JOIN_V6)
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- step("r1: Verify ip mroutes")
- result = verify_mroutes(tgen, dut, STAR, IGMP_JOIN_V6, oif, iif)
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- step("r1: Delete RP configuration")
- input_dict = {
- "r2": {
- "pim6": {
- "rp": [
- {
- "rp_addr": TOPO["routers"]["r2"]["links"]["lo"]["ipv6"].split(
- "/"
- )[0],
- "group_addr_range": GROUP_RANGE_V6,
- "delete": True,
- }
- ]
- }
- }
- }
-
- result = create_pim_config(tgen, TOPO, input_dict)
- assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
-
- step("r1: Verify RP info")
- result = verify_pim_rp_info(
- tgen, TOPO, dut, GROUP_RANGE_V6, oif, rp_address, SOURCE, expected=False
- )
- assert (
- result is not True
- ), "Testcase {} :Failed \n " "RP: {} info is still present \n Error: {}".format(
- tc_name, rp_address, result
- )
-
- step("r1: Verify upstream IIF interface")
- result = verify_upstream_iif(tgen, dut, oif, STAR, IGMP_JOIN_V6, expected=False)
- assert result is not True, (
- "Testcase {} :Failed \n "
- "Upstream ({}, {}) is still in join state \n Error: {}".format(
- tc_name, STAR, IGMP_JOIN_V6, result
- )
- )
-
- step("r1: Verify upstream join state and join timer")
- result = verify_join_state_and_timer(
- tgen, dut, oif, STAR, IGMP_JOIN_V6, expected=False
- )
- assert result is not True, (
- "Testcase {} :Failed \n "
- "Upstream ({}, {}) timer is still running \n Error: {}".format(
- tc_name, STAR, IGMP_JOIN_V6, result
- )
- )
-
- step("r1: Verify PIM state")
- result = verify_pim_state(tgen, dut, oif, iif, IGMP_JOIN_V6, expected=False)
- assert result is not True, (
- "Testcase {} :Failed \n "
- "PIM state for group: {} is still Active \n Error: {}".format(
- tc_name, IGMP_JOIN_V6, result
- )
- )
-
- step("r1: Verify ip mroutes")
- result = verify_mroutes(tgen, dut, STAR, IGMP_JOIN_V6, oif, iif, expected=False)
- assert result is not True, (
- "Testcase {} :Failed \n "
- "mroute ({}, {}) is still present \n Error: {}".format(
- tc_name, STAR, IGMP_JOIN_V6, result
- )
- )
-
- step("r1: Verify show ip pim interface traffic without any IGMP join")
- state_after = verify_pim_interface_traffic(tgen, state_dict, addr_type="ipv6")
- assert isinstance(
- state_after, dict
- ), "Testcase{} : Failed \n state_before is not dictionary \n " "Error: {}".format(
- tc_name, result
- )
-
- result = verify_state_incremented(state_before, state_after)
- assert result is True, "Testcase{} : Failed Error: {}".format(tc_name, result)
-
- write_test_footer(tc_name)
-
-
-if __name__ == "__main__":
- args = ["-s"] + sys.argv[1:]
- sys.exit(pytest.main(args))
diff --git a/tests/topotests/pytest.ini b/tests/topotests/pytest.ini
index 7dd13935b1..6986e3051c 100644
--- a/tests/topotests/pytest.ini
+++ b/tests/topotests/pytest.ini
@@ -46,6 +46,7 @@ markers =
pathd: Tests that run against PATHD
pbrd: Tests that run against PBRD
pimd: Tests that run against PIMD
+ pim6d: Tests that run against PIM6D
ripd: Tests that run against RIPD
ripngd: Tests that run against RIPNGD
sharpd: Tests that run against SHARPD