summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/pim.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/pim.py')
-rw-r--r--tests/topotests/lib/pim.py240
1 files changed, 235 insertions, 5 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index cc56ffdd8c..369a794ebc 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -332,6 +332,13 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ if attribute == "static-group":
+ for group in data:
+ cmd = "ip {} {} {}".format(protocol, attribute, group)
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
if attribute == "query":
for query, value in data.items():
if query != "delete":
@@ -1600,7 +1607,7 @@ def verify_pim_rp_info(
if type(group_addresses) is not list:
group_addresses = [group_addresses]
- if type(oif) is not list:
+ if oif is not None and type(oif) is not list:
oif = [oif]
for grp in group_addresses:
@@ -1739,6 +1746,49 @@ def verify_pim_rp_info(
@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_rp_info_is_empty(tgen, dut, af="ipv4"):
+ """
+ Verify pim rp info by running "show ip pim rp-info" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+
+ Usage
+ -----
+ dut = "r1"
+ result = verify_pim_rp_info_is_empty(tgen, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ ip_cmd = "ip"
+ if af == "ipv6":
+ ip_cmd = "ipv6"
+
+ logger.info("[DUT: %s]: Verifying %s rp info", dut, ip_cmd)
+ cmd = "show {} pim rp-info json".format(ip_cmd)
+ show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if show_ip_rp_info_json:
+ errormsg = "[DUT %s]: Verifying empty rp-info [FAILED]!!" % (dut)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
def verify_pim_state(
tgen,
dut,
@@ -2404,10 +2454,11 @@ def clear_igmp_interfaces(tgen, dut):
# Verify uptime for groups
for group in group_before_clear.keys():
- d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S")
- d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S")
- if d2 >= d1:
- errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut)
+ if group in group_after_clear:
+ d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S")
+ d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S")
+ if d2 >= d1:
+ errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut)
logger.info("[DUT: %s]: IGMP group is cleared [PASSED!!]")
@@ -2744,6 +2795,48 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N
return True
+def scapy_send_autorp_raw_packet(tgen, senderRouter, senderInterface, packet=None):
+ """
+ Using scapy Raw() method to send AutoRP raw packet from one FRR
+ to other
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `senderRouter` : Sender router
+ * `senderInterface` : SenderInterface
+ * `packet` : AutoRP packet in raw format
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ global CWD
+ result = ""
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ python3_path = tgen.net.get_exec_path(["python3", "python"])
+ # send_bsr_packet.py has no direct ties to bsr, just sends a raw packet out
+ # a given interface, so just reuse it
+ script_path = os.path.join(CWD, "send_bsr_packet.py")
+ node = tgen.net[senderRouter]
+
+ cmd = [
+ python3_path,
+ script_path,
+ packet,
+ senderInterface,
+ "--interval=1",
+ "--count=1",
+ ]
+ logger.info("Scapy cmd: \n %s", cmd)
+ node.cmd_raises(cmd)
+
+ logger.debug("Exiting lib API: scapy_send_autorp_raw_packet")
+ return True
+
+
def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None):
"""
Find which RP is having lowest prioriy and returns rp IP
@@ -4254,6 +4347,143 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
return True
+@retry(retry_timeout=62)
+def verify_static_groups(tgen, dut, interface, group_addresses):
+ """
+ Verify static groups are received from an intended interface
+ by running "show ip igmp static-group json" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which IGMP groups are configured
+ * `group_addresses`: IGMP group address
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_static_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying static groups received:", dut)
+ show_static_group_json = run_frr_cmd(
+ rnode, "show ip igmp static-group json", isjson=True
+ )
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface not in show_static_group_json:
+ errormsg = (
+ "[DUT %s]: Verifying static group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ for grp_addr in group_addresses:
+ found = False
+ for index in show_static_group_json[interface]["groups"]:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if not found:
+ errormsg = (
+ "[DUT %s]: Verifying static group received"
+ " from interface %s [FAILED]!! "
+ " Expected: %s " % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying static group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=62)
+def verify_local_igmp_proxy_groups(
+ tgen, dut, group_addresses_present, group_addresses_not_present
+):
+ """
+ Verify igmp proxy groups are as expected by running
+ "show ip igmp static-group json" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `group_addresses_present`: IGMP group addresses which should
+ currently be proxied
+ * `group_addresses_not_present`: IGMP group addresses which should
+ not currently be proxied
+
+ Usage
+ -----
+ dut = "r1"
+ group_addresses_present = "225.1.1.1"
+ group_addresses_not_present = "225.2.2.2"
+ result = verify_igmp_proxy_groups(tgen, dut, group_p, group_np)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ if dut not in tgen.routers():
+ errormsg = "[DUT %s]: Device not found!"
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying local IGMP proxy groups:", dut)
+
+ out = rnode.vtysh_cmd("show ip igmp proxy json", isjson=True)
+ groups = [g["group"] if "group" in g else None for g in out["r1-eth1"]["groups"]]
+
+ if type(group_addresses_present) is not list:
+ group_addresses_present = [group_addresses_present]
+ if type(group_addresses_not_present) is not list:
+ group_addresses_not_present = [group_addresses_not_present]
+
+ for test_addr in group_addresses_present:
+ if not test_addr in groups:
+ errormsg = (
+ "[DUT %s]: Verifying local IGMP proxy joins FAILED!! "
+ " Expected but not found: %s " % (dut, test_addr)
+ )
+ return errormsg
+
+ for test_addr in group_addresses_not_present:
+ if test_addr in groups:
+ errormsg = (
+ "[DUT %s]: Verifying local IGMP proxy join removed FAILED!! "
+ " Unexpected but found: %s " % (dut, test_addr)
+ )
+ return errormsg
+
+ return True
+
+
def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""
Verify ip pim interface traffic by running