diff options
Diffstat (limited to 'tests/topotests/lib/pim.py')
| -rw-r--r-- | tests/topotests/lib/pim.py | 240 |
1 files changed, 235 insertions, 5 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py index cc56ffdd8c..369a794ebc 100644 --- a/tests/topotests/lib/pim.py +++ b/tests/topotests/lib/pim.py @@ -332,6 +332,13 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False): cmd = "no {}".format(cmd) config_data.append(cmd) + if attribute == "static-group": + for group in data: + cmd = "ip {} {} {}".format(protocol, attribute, group) + if del_attr: + cmd = "no {}".format(cmd) + config_data.append(cmd) + if attribute == "query": for query, value in data.items(): if query != "delete": @@ -1600,7 +1607,7 @@ def verify_pim_rp_info( if type(group_addresses) is not list: group_addresses = [group_addresses] - if type(oif) is not list: + if oif is not None and type(oif) is not list: oif = [oif] for grp in group_addresses: @@ -1739,6 +1746,49 @@ def verify_pim_rp_info( @retry(retry_timeout=60, diag_pct=0) +def verify_pim_rp_info_is_empty(tgen, dut, af="ipv4"): + """ + Verify pim rp info by running "show ip pim rp-info" cli + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + + Usage + ----- + dut = "r1" + result = verify_pim_rp_info_is_empty(tgen, dut) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + if dut not in tgen.routers(): + return False + + rnode = tgen.routers()[dut] + + ip_cmd = "ip" + if af == "ipv6": + ip_cmd = "ipv6" + + logger.info("[DUT: %s]: Verifying %s rp info", dut, ip_cmd) + cmd = "show {} pim rp-info json".format(ip_cmd) + show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True) + + if show_ip_rp_info_json: + errormsg = "[DUT %s]: Verifying empty rp-info [FAILED]!!" % (dut) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=60, diag_pct=0) def verify_pim_state( tgen, dut, @@ -2404,10 +2454,11 @@ def clear_igmp_interfaces(tgen, dut): # Verify uptime for groups for group in group_before_clear.keys(): - d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S") - d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S") - if d2 >= d1: - errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut) + if group in group_after_clear: + d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S") + d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S") + if d2 >= d1: + errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut) logger.info("[DUT: %s]: IGMP group is cleared [PASSED!!]") @@ -2744,6 +2795,48 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N return True +def scapy_send_autorp_raw_packet(tgen, senderRouter, senderInterface, packet=None): + """ + Using scapy Raw() method to send AutoRP raw packet from one FRR + to other + + Parameters: + ----------- + * `tgen` : Topogen object + * `senderRouter` : Sender router + * `senderInterface` : SenderInterface + * `packet` : AutoRP packet in raw format + + returns: + -------- + errormsg or True + """ + + global CWD + result = "" + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + python3_path = tgen.net.get_exec_path(["python3", "python"]) + # send_bsr_packet.py has no direct ties to bsr, just sends a raw packet out + # a given interface, so just reuse it + script_path = os.path.join(CWD, "send_bsr_packet.py") + node = tgen.net[senderRouter] + + cmd = [ + python3_path, + script_path, + packet, + senderInterface, + "--interval=1", + "--count=1", + ] + logger.info("Scapy cmd: \n %s", cmd) + node.cmd_raises(cmd) + + logger.debug("Exiting lib API: scapy_send_autorp_raw_packet") + return True + + def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None): """ Find which RP is having lowest prioriy and returns rp IP @@ -4254,6 +4347,143 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses): return True +@retry(retry_timeout=62) +def verify_static_groups(tgen, dut, interface, group_addresses): + """ + Verify static groups are received from an intended interface + by running "show ip igmp static-group json" command + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + * `interface`: interface, from which IGMP groups are configured + * `group_addresses`: IGMP group address + + Usage + ----- + dut = "r1" + interface = "r1-r0-eth0" + group_address = "225.1.1.1" + result = verify_static_groups(tgen, dut, interface, group_address) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + if dut not in tgen.routers(): + return False + + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying static groups received:", dut) + show_static_group_json = run_frr_cmd( + rnode, "show ip igmp static-group json", isjson=True + ) + + if type(group_addresses) is not list: + group_addresses = [group_addresses] + + if interface not in show_static_group_json: + errormsg = ( + "[DUT %s]: Verifying static group received" + " from interface %s [FAILED]!! " % (dut, interface) + ) + return errormsg + + for grp_addr in group_addresses: + found = False + for index in show_static_group_json[interface]["groups"]: + if index["group"] == grp_addr: + found = True + break + if not found: + errormsg = ( + "[DUT %s]: Verifying static group received" + " from interface %s [FAILED]!! " + " Expected: %s " % (dut, interface, grp_addr) + ) + return errormsg + + logger.info( + "[DUT %s]: Verifying static group %s received " + "from interface %s [PASSED]!! ", + dut, + grp_addr, + interface, + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=62) +def verify_local_igmp_proxy_groups( + tgen, dut, group_addresses_present, group_addresses_not_present +): + """ + Verify igmp proxy groups are as expected by running + "show ip igmp static-group json" command + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + * `group_addresses_present`: IGMP group addresses which should + currently be proxied + * `group_addresses_not_present`: IGMP group addresses which should + not currently be proxied + + Usage + ----- + dut = "r1" + group_addresses_present = "225.1.1.1" + group_addresses_not_present = "225.2.2.2" + result = verify_igmp_proxy_groups(tgen, dut, group_p, group_np) + + Returns + ------- + errormsg(str) or True + """ + + if dut not in tgen.routers(): + errormsg = "[DUT %s]: Device not found!" + return errormsg + + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying local IGMP proxy groups:", dut) + + out = rnode.vtysh_cmd("show ip igmp proxy json", isjson=True) + groups = [g["group"] if "group" in g else None for g in out["r1-eth1"]["groups"]] + + if type(group_addresses_present) is not list: + group_addresses_present = [group_addresses_present] + if type(group_addresses_not_present) is not list: + group_addresses_not_present = [group_addresses_not_present] + + for test_addr in group_addresses_present: + if not test_addr in groups: + errormsg = ( + "[DUT %s]: Verifying local IGMP proxy joins FAILED!! " + " Expected but not found: %s " % (dut, test_addr) + ) + return errormsg + + for test_addr in group_addresses_not_present: + if test_addr in groups: + errormsg = ( + "[DUT %s]: Verifying local IGMP proxy join removed FAILED!! " + " Unexpected but found: %s " % (dut, test_addr) + ) + return errormsg + + return True + + def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"): """ Verify ip pim interface traffic by running |
