summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/pim.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/pim.py')
-rw-r--r--tests/topotests/lib/pim.py100
1 files changed, 88 insertions, 12 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index f7440efd6d..71e36b6229 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -149,7 +149,7 @@ def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict):
# At least one interface must be enabled for PIM on the router
pim_if_enabled = False
pim6_if_enabled = False
- for destLink, data in topo[dut]["links"].items():
+ for _, data in topo[dut]["links"].items():
if "pim" in data:
pim_if_enabled = True
if "pim6" in data:
@@ -332,6 +332,13 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ if attribute == "static-group":
+ for group in data:
+ cmd = "ip {} {} {}".format(protocol, attribute, group)
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
if attribute == "query":
for query, value in data.items():
if query != "delete":
@@ -603,7 +610,7 @@ def find_rp_details(tgen, topo):
# ip address of RP
rp_addr = rp_dict["rp_addr"]
- for link, data in topo["routers"][router]["links"].items():
+ for _, data in topo["routers"][router]["links"].items():
if data["ipv4"].split("/")[0] == rp_addr:
rp_details[router] = rp_addr
@@ -2089,7 +2096,7 @@ def verify_pim_interface(
)
return True
else:
- for destLink, data in topo["routers"][dut]["links"].items():
+ for _, data in topo["routers"][dut]["links"].items():
if "type" in data and data["type"] == "loopback":
continue
@@ -2292,7 +2299,7 @@ def clear_pim_interfaces(tgen, dut):
# Waiting for maximum 60 sec
fail_intf = []
- for retry in range(1, 13):
+ for _ in range(1, 13):
sleep(5)
logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
run_json_after = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True)
@@ -2368,7 +2375,7 @@ def clear_igmp_interfaces(tgen, dut):
total_groups_before_clear = igmp_json["totalGroups"]
- for key, value in igmp_json.items():
+ for _, value in igmp_json.items():
if type(value) is not dict:
continue
@@ -2381,7 +2388,7 @@ def clear_igmp_interfaces(tgen, dut):
result = run_frr_cmd(rnode, "clear ip igmp interfaces")
# Waiting for maximum 60 sec
- for retry in range(1, 13):
+ for _ in range(1, 13):
logger.info(
"[DUT: %s]: Waiting for 5 sec for igmp interfaces" " to come up", dut
)
@@ -2460,7 +2467,7 @@ def clear_mroute_verify(tgen, dut, expected=True):
# RFC 3376: 8.2. Query Interval - Default: 125 seconds
# So waiting for maximum 130 sec to get the igmp report
- for retry in range(1, 26):
+ for _ in range(1, 26):
logger.info("[DUT: %s]: Waiting for 2 sec for mroutes" " to come up", dut)
sleep(5)
keys_json1 = mroute_json_1.keys()
@@ -2671,7 +2678,7 @@ def add_rp_interfaces_and_pim_config(tgen, topo, interface, rp, rp_mapping):
try:
config_data = []
- for group, rp_list in rp_mapping.items():
+ for _, rp_list in rp_mapping.items():
for _rp in rp_list:
config_data.append("interface {}".format(interface))
config_data.append("ip address {}".format(_rp))
@@ -2720,7 +2727,7 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N
script_path = os.path.join(CWD, "send_bsr_packet.py")
node = tgen.net[senderRouter]
- for destLink, data in topo["routers"][senderRouter]["links"].items():
+ for _, data in topo["routers"][senderRouter]["links"].items():
if "type" in data and data["type"] == "loopback":
continue
@@ -2795,12 +2802,12 @@ def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None):
# RP with lowest priority
if len(priority_dict) != 1:
- rp_p, lowest_priority = sorted(rp_priority.items(), key=lambda x: x[1])[0]
+ rp_p, _ = sorted(rp_priority.items(), key=lambda x: x[1])[0]
rp_details[group] = rp_p
# RP with highest hash value
if len(priority_dict) == 1:
- rp_h, highest_hash = sorted(rp_hash.items(), key=lambda x: x[1])[-1]
+ rp_h, _ = sorted(rp_hash.items(), key=lambda x: x[1])[-1]
rp_details[group] = rp_h
# RP with highest IP address
@@ -3239,7 +3246,7 @@ def verify_pim_join(
interface_json = show_pim_join_json[interface]
grp_addr = grp_addr.split("/")[0]
- for source, data in interface_json[grp_addr].items():
+ for _, data in interface_json[grp_addr].items():
# Verify pim join
if pim_join:
if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
@@ -4253,6 +4260,75 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+@retry(retry_timeout=62)
+def verify_static_groups(tgen, dut, interface, group_addresses):
+ """
+ Verify static groups are received from an intended interface
+ by running "show ip igmp static-group json" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which IGMP groups are configured
+ * `group_addresses`: IGMP group address
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_static_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying static groups received:", dut)
+ show_static_group_json = run_frr_cmd(rnode, "show ip igmp static-group json", isjson=True)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface not in show_static_group_json:
+ errormsg = (
+ "[DUT %s]: Verifying static group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ for grp_addr in group_addresses:
+ found = False
+ for index in show_static_group_json[interface]["groups"]:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if not found:
+ errormsg = (
+ "[DUT %s]: Verifying static group received"
+ " from interface %s [FAILED]!! "
+ " Expected: %s " % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying static group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""