# R1 interface eth2
switch = tgen.add_switch("s3")
tgen.add_host("h1", "192.168.100.100/24", "via 192.168.100.1")
+ tgen.add_host("h3", "192.168.100.101/24", "via 192.168.100.1")
switch.add_link(tgen.gears["r1"])
switch.add_link(tgen.gears["h1"])
+ switch.add_link(tgen.gears["h3"])
# R2 interface eth1
switch = tgen.add_switch("s4")
app_helper.stop_host("h1")
+def test_igmp_immediate_leave():
+ "Test IGMPv2 immediate leave feature."
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ topotest.sysctl_assure(
+ tgen.gears["h1"],
+ "net.ipv4.conf.h1-eth0.force_igmp_version",
+ "2")
+ tgen.gears["r1"].vtysh_cmd("""
+ configure terminal
+ interface r1-eth2
+ ip igmp immediate-leave
+ """)
+
+ app_helper.run("h1", ["224.0.110.1", "h1-eth0"])
+ app_helper.run("h3", ["224.0.110.1", "h3-eth0"])
+
+ def expect_igmp_group():
+ igmp_groups = tgen.gears["r1"].vtysh_cmd("show ip igmp groups json", isjson=True)
+ try:
+ for group in igmp_groups["r1-eth2"]["groups"]:
+ if group["group"] == "224.0.110.1":
+ return True
+
+ return False
+ except KeyError:
+ return False
+
+ topotest.run_and_expect(expect_igmp_group, True, count=10, wait=2)
+
+ # Send leave and expect immediate leave
+ app_helper.stop_host("h1")
+ topotest.run_and_expect(expect_igmp_group, False, count=10, wait=2)
+
+ # Clean up
+ tgen.gears["r1"].vtysh_cmd("""
+ configure terminal
+ interface r1-eth2
+ no ip igmp immediate-leave
+ """)
+ topotest.sysctl_assure(
+ tgen.gears["h1"],
+ "net.ipv4.conf.h1-eth0.force_igmp_version",
+ "0")
+ app_helper.stop_host("h3")
+
+
+def test_mldv1_immediate_leave():
+ "Test MLDv1 immediate leave feature."
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ topotest.sysctl_assure(
+ tgen.gears["h1"],
+ "net.ipv6.conf.h1-eth0.force_mld_version",
+ "1")
+ tgen.gears["r1"].vtysh_cmd("""
+ configure terminal
+ interface r1-eth2
+ ipv6 mld immediate-leave
+ """)
+
+ app_helper.run("h1", ["ff05::2000", "h1-eth0"])
+ app_helper.run("h3", ["ff05::2000", "h3-eth0"])
+
+ def expect_mld_group():
+ igmp_groups = tgen.gears["r1"].vtysh_cmd("show ipv6 mld groups json", isjson=True)
+ try:
+ for group in igmp_groups["r1-eth2"]["groups"]:
+ if group["group"] == "ff05::2000":
+ return True
+
+ return False
+ except KeyError:
+ return False
+
+ topotest.run_and_expect(expect_mld_group, True, count=10, wait=2)
+
+ # Send leave and expect immediate leave
+ app_helper.stop_host("h1")
+ topotest.run_and_expect(expect_mld_group, False, count=10, wait=2)
+
+ # Clean up
+ tgen.gears["r1"].vtysh_cmd("""
+ configure terminal
+ interface r1-eth2
+ no ipv6 mld immediate-leave
+ """)
+ topotest.sysctl_assure(
+ tgen.gears["h1"],
+ "net.ipv6.conf.h1-eth0.force_mld_version",
+ "0")
+ app_helper.stop_host("h3")
+
+
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()