for dut in input_dict.keys():
rnode = tgen.routers()[dut]
-
- for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
+ for interface, data in input_dict[dut]["mld"]["interfaces"].items():
statistics = False
report = False
- if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
+ if "statistics" in input_dict[dut]["mld"]["interfaces"][interface]["mld"]:
statistics = True
cmd = "show ipv6 mld statistics"
else:
rnode, "{} interface {} json".format(cmd, interface), isjson=True
)
+ show_ipv6_mld_intf_json = show_ipv6_mld_intf_json["default"]
+
if not report:
if interface not in show_ipv6_mld_intf_json:
errormsg = (
for query, value in data["mld"]["query"].items():
if query == "query-interval":
# Verifying IGMP interface query interval timer
- if intf_detail_json["timerQueryInterval"] != value:
+ if intf_detail_json["timerQueryIntervalMsec"] != value * 1000:
errormsg = (
"[DUT %s]: MLD interface: %s "
" query-interval verification "
dut,
interface,
value,
- intf_detail_json["timerQueryInterval"],
+ intf_detail_json["timerQueryIntervalMsec"],
)
)
return errormsg
"[DUT %s]: MLD interface: %s " "query-interval is %s",
dut,
interface,
- value,
+ value * 1000,
)
if query == "query-max-response-time":
# Verifying IGMP interface query max response timer
if (
- intf_detail_json["timerQueryResponseIntervalMsec"]
+ intf_detail_json["timerQueryResponseTimerMsec"]
!= value * 100
):
errormsg = (
% (
dut,
interface,
- value * 1000,
- intf_detail_json["timerQueryResponseIntervalMsec"],
+ value * 100,
+ intf_detail_json["timerQueryResponseTimerMsec"],
)
)
return errormsg
if query == "last-member-query-interval":
# Verifying IGMP interface last member query interval
if (
- intf_detail_json["timerLastMemberQueryMsec"]
- != value * 100 * intf_detail_json["lastMemberQueryCount"]
+ intf_detail_json["timerLastMemberQueryIntervalMsec"]
+ != value * 100
):
errormsg = (
"[DUT %s]: MLD interface: %s "
% (
dut,
interface,
- value * 1000,
- intf_detail_json["timerLastMemberQueryMsec"],
+ value * 100,
+ intf_detail_json[
+ "timerLastMemberQueryIntervalMsec"
+ ],
)
)
return errormsg
"last-member-query-interval is %s ms",
dut,
interface,
- value * intf_detail_json["lastMemberQueryCount"] * 100,
+ value * 100,
)
if "version" in data["mld"]:
@retry(retry_timeout=60, diag_pct=0)
-def verify_pim_nexthop(tgen, topo, dut, nexthop, addr_type):
+def verify_pim_nexthop(tgen, topo, dut, nexthop, addr_type="ipv4"):
"""
Verify all PIM nexthop details using "show ip/ipv6 pim neighbor" cli
return True
+@retry(retry_timeout=60, diag_pct=0)
def verify_sg_traffic(tgen, dut, groups, src, addr_type="ipv4"):
"""
Verify multicast traffic by running
Usage
-----
- result = verify_sg_traffic(tgen, "r1", igmp_groups, srcaddress)
+ result = verify_sg_traffic(tgen, "r1", igmp_groups/mld_groups, srcaddress)
Returns
-------
after_traffic[grp] = show_mroute_sg_traffic_json[grp][src]["packets"]
for grp in groups:
- if after_traffic[grp] < before_traffic[grp]:
+ if after_traffic[grp] <= before_traffic[grp]:
errormsg = (
"[DUT %s]: Verifying igmp group %s source %s not increamenting traffic"
" [FAILED]!! " % (dut, grp, src)
return result
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim6_config(tgen, input_dict, expected=True):
+ """
+ Verify pim interface details, verifying following configs:
+ drPriority
+ helloPeriod
+ helloReceived
+ helloSend
+ drAddress
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict` : Input dict data, required to verify
+ timer
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict ={
+ "l1": {
+ "mld": {
+ "interfaces": {
+ "l1-i1-eth1": {
+ "pim6": {
+ "drPriority" : 10,
+ "helloPeriod" : 5
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_pim6_config(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for interface, data in input_dict[dut]["pim6"]["interfaces"].items():
+
+ logger.info(
+ "[DUT: %s]: Verifying PIM6 interface %s detail:", dut, interface
+ )
+
+ show_ipv6_pim_intf_json = run_frr_cmd(
+ rnode, "show ipv6 pim interface {} json".format(interface), isjson=True
+ )
+
+ if interface not in show_ipv6_pim_intf_json:
+ errormsg = (
+ "[DUT %s]: PIM6 interface: %s "
+ " is not present in CLI output "
+ "[FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ intf_detail_json = show_ipv6_pim_intf_json[interface]
+
+ for config, value in data.items():
+ if config == "helloPeriod":
+ # Verifying PIM interface helloPeriod
+ if intf_detail_json["helloPeriod"] != value:
+ errormsg = (
+ "[DUT %s]: PIM6 interface: %s "
+ " helloPeriod verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (dut, interface, value, intf_detail_json["helloPeriod"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM6 interface: %s " "helloPeriod is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if config == "drPriority":
+ # Verifying PIM interface drPriority
+ if intf_detail_json["drPriority"] != value:
+ errormsg = (
+ "[DUT %s]: PIM6 interface: %s "
+ " drPriority verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (dut, interface, value, intf_detail_json["drPriority"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM6 interface: %s " "drPriority is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if config == "drAddress":
+ # Verifying PIM interface drAddress
+ if intf_detail_json["drAddress"] != value:
+ errormsg = (
+ "[DUT %s]: PIM6 interface: %s "
+ " drAddress verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (dut, interface, value, intf_detail_json["drAddress"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM6 interface: %s " "drAddress is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
# def cleanup(self):
# super(McastTesterHelper, self).cleanup()