print("Show that v4 routes are right\n")
v4_routesFile = "%s/r%s/ipv4_routes.ref" % (thisDir, i)
- expected = net["r%s" % i].cmd(
- "sort {} 2> /dev/null".format(v4_routesFile)
- ).rstrip()
+ expected = (
+ net["r%s" % i].cmd("sort {} 2> /dev/null".format(v4_routesFile)).rstrip()
+ )
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (
print("Show that v6 routes are right\n")
v6_routesFile = "%s/r%s/ipv6_routes.ref" % (thisDir, i)
- expected = net["r%s" % i].cmd(
- "sort {} 2> /dev/null".format(v6_routesFile)
- ).rstrip()
+ expected = (
+ net["r%s" % i].cmd("sort {} 2> /dev/null".format(v6_routesFile)).rstrip()
+ )
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (
associated_int = r1_snmp.get(
"mplsL3VpnVrfAssociatedInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
)
- assertmsg = "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
- associated_int
+ assertmsg = (
+ "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
+ associated_int
+ )
)
assert associated_int == "3", assertmsg
kill_mininet_routers_process,
get_frr_ipv6_linklocal,
create_route_maps,
- required_linux_kernel_version
+ required_linux_kernel_version,
)
# Reading the data from JSON File for topology and configuration creation
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r2: R-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r2: R-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info("Restart BGPd on R2 ")
kill_router_daemons(tgen, "r2", ["bgpd"])
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r2: R-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r2: R-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: R-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: R-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
# Verifying BGP RIB routes
next_hop = next_hop_per_address_family(
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info("[Phase 4] : R1 is about to come up now ")
start_router_daemons(tgen, "r1", ["bgpd"])
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
result = verify_f_bit(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: F-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: F-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: R-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: R-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
# Verifying BGP RIB routes
next_hop = next_hop_per_address_family(
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
protocol = "bgp"
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
dut = "r2"
peer = "r1"
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r2: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r2: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
step("Bring up BGP on R1 and remove Peer-level GR config from R1")
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
step("Bring up BGP on R2 and remove Peer-level GR config from R1")
kill_mininet_routers_process,
get_frr_ipv6_linklocal,
create_route_maps,
- required_linux_kernel_version
+ required_linux_kernel_version,
)
# Reading the data from JSON File for topology and configuration creation
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r2: EOR is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r2: EOR is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info(
"Waiting for selection deferral timer({} sec)..".format(GR_SELECT_DEFER_TIMER)
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r1", peer="r3", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: EOR is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info(
"Waiting for selection deferral timer({} sec).. ".format(
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r3", peer="r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: EOR is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r3: EOR is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
dut = "r6"
input_dict_1 = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r6: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r6: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r6: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes
dut = "r2"
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r6: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r6: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
# Verifying BGP RIB routes before shutting down BGPd daemon
input_dict = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 4] : Start BGPd daemon on R1..")
result = verify_rib(
tgen, addr_type, dut, input_dict_2, next_hop_4, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
if addr_type == "ipv6":
result = verify_rib(
tgen, addr_type, dut, input_dict_2, next_hop_6, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 4] : Start BGPd daemon on R1 and R4..")
result = verify_f_bit(
tgen, topo, addr_type, input_dict, "r3", "r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: F-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r3: F-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes after starting BGPd daemon
input_dict_1 = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
result = verify_f_bit(
tgen, topo, addr_type, input_dict_2, "r3", "r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: F-bit is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r3: F-bit is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 3] : Kill BGPd daemon on R1..")
# Verifying BGP RIB routes
input_dict = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(tgen, addr_type, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
# Start BGPd daemon on R1
result = verify_eor(
tgen, topo, addr_type, input_dict_3, dut="r5", peer="r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r5: EOR is set to TRUE\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r5: EOR is set to TRUE\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes after starting BGPd daemon
input_dict_1 = {key: topo["routers"][key] for key in ["r5"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r3", peer="r1", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: EOR is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r3: EOR is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes after starting BGPd daemon
result = verify_eor(
tgen, topo, addr_type, input_dict_3, dut="r1", peer="r3", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: EOR is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
# Verifying BGP RIB routes before shutting down BGPd daemon
input_dict = {key: topo["routers"][key] for key in ["r3"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
dut = "r1"
input_dict_1 = {key: topo["routers"][key] for key in ["r3"]}
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
write_test_footer(tc_name)
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: EOR is set to True\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format(
tc_name, result
- ))
+ )
# Verifying BGP RIB routes received from router R1
dut = "r1"
dut = "r3"
input_dict_1 = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r3: routes are still present in ZEBRA\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
+ )
logger.info(" Expected behavior: {}".format(result))
# Start BGPd daemon on R1
result = verify_bgp_community(
tgen, adt, dut, NETWORKS[adt], input_dict_4, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"largeCommunity is still present after deleting route-map \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
write_test_footer(tc_name)
dut = "r6"
for adt in ADDR_TYPES:
result = verify_bgp_community(tgen, adt, dut, NETWORKS[adt], expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "Community-list is still present \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "Community-list is still present \n Error: {}".format(tc_name, result)
+ )
write_test_footer(tc_name)
result = verify_bgp_community(
tgen, adt, dut, NETWORKS[adt], input_dict_7, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "largeCommunity is still present \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "largeCommunity is still present \n Error: {}".format(tc_name, result)
+ )
write_test_footer(tc_name)
)
result = verify_rib(tgen, addr_type, dut, input_dict_1, tag=500, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are present with tag value 500 \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "Routes are present with tag value 500 \n Error: {}".format(tc_name, result)
+ )
logger.info("Expected Behavior: {}".format(result))
step(
result = verify_rib(
tgen, addr_type, dut, input_dict_1, metric=123, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"Routes are present with metric value 123 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info("Expected Behavior: {}".format(result))
result = verify_rib(tgen, addr_type, dut, input_dict_2, metric=123)
result = verify_rib(
tgen, addr_type, dut, input_dict_2, metric=0, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"Routes are present with metric value 0 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info("Expected Behavior: {}".format(result))
write_test_footer(tc_name)
}
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"Routes are still present in VRF RED_A and RED_B \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
result = verify_rib(tgen, addr_type, dut, input_dict_2, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"Routes are still present in VRF BLUE_A and BLUE_B \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("Bring up BGPd daemon on R1.")
start_router_daemons(tgen, "r1", ["bgpd"])
protocol="bgp",
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Reconfigure the same static route on R2 again")
dut = "r2"
result = verify_rib(
tgen, addr_type, "r2", input_dict_4, protocol="bgp", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Shut interface on R2 that has IP from the subnet as BGP next-hop")
intf_r2_r4 = topo["routers"]["r2"]["links"]["r4"]["interface"]
next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
step("Verify that once eBGP multi-hop is removed, BGP session goes down")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
step("Add ebgp-multihop command on R3 again")
for addr_type in ADDR_TYPES:
step("Verify that BGP session goes down, when update-source is removed")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
step("Add update-source command on R1 again")
for addr_type in ADDR_TYPES:
next_hop=topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
sleep(3)
step("Verify that BGP session goes down, when static route is removed")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
step("Add static route on R3 again")
for addr_type in ADDR_TYPES:
sleep(3)
step("Verify that BGP neighborship between R1 and R3 goes down")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
intf_r1_r3 = topo["routers"]["r1"]["links"]["r3"]["interface"]
shutdown_bringup_interface(tgen, "r1", intf_r1_r3, True)
],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Reconfigure multipath-relax command on R4")
result = create_router_bgp(tgen, topo, maxpath_relax)
],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Re-configure maximum-path 2 command on R4")
input_dict_8 = {
"configured but not peer routers"
)
result = verify_bgp_convergence(tgen, topo, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
step("configure same password on R2 and R3")
for routerN in ["r2", "r3"]:
"strings are in CAPs on R2 and R3"
)
result = verify_bgp_convergence(tgen, topo, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
step("Configure same password on R2 and R3 without CAPs")
for routerN in ["r2", "r3"]:
step("Verify if password is removed from R1, both sessions go down again")
result = verify_bgp_convergence(tgen, topo, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "BGP is converged \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
- ))
+ )
step("Configure alphanumeric password on R1 and peer routers R2,R3")
for bgp_neighbor in ["r2", "r3"]:
#
#####################################################
+
def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request):
"""
TC5_FUNC_5:
for addr_type in ADDR_TYPES:
- step(
- "On router R1 delete static routes in vrf ISR to LOOPBACK_1"
- )
+ step("On router R1 delete static routes in vrf ISR to LOOPBACK_1")
input_routes_r1 = {
"r1": {
{
"network": [NETWORK1_3[addr_type], NETWORK1_4[addr_type]],
"next_hop": (intf_r2_r1[addr_type]).split("/")[0],
- "delete": True
+ "delete": True,
}
]
}
from lib.topotest import g_extra_config as topotest_extra_config
from lib.topolog import logger
+
def pytest_addoption(parser):
"""
Add topology-only option to the topology tester. This option makes pytest
vtysh_on_error = config.getoption("--vtysh-on-error")
topotest_extra_config["vtysh_on_error"] = vtysh_on_error
- topotest_extra_config["pause_after"] = (
- pause_after or shell or vtysh
- )
+ topotest_extra_config["pause_after"] = pause_after or shell or vtysh
topotest_extra_config["topology_only"] = config.getoption("--topology-only")
else:
error = True
# Handle assert failures
- parent._previousfailed = item # pylint: disable=W0212
+ parent._previousfailed = item # pylint: disable=W0212
logger.error(
- 'assert failed at "{}/{}": {}'.format(modname, item.name, call.excinfo.value)
+ 'assert failed at "{}/{}": {}'.format(
+ modname, item.name, call.excinfo.value
+ )
)
# (topogen) Set topology error to avoid advancing in the test.
# This will cause topogen to report error on `routers_have_failure`.
tgen.set_error("{}/{}".format(modname, item.name))
-
if error and topotest_extra_config["shell_on_error"]:
for router in tgen.routers():
pause = True
switch.add_link(tgen.gears["r3"])
-
def setup_module(mod):
"Sets up the pytest environment"
# Don't start the following in the CE nodes
if router.name[0] == "r":
router.load_config(
- TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)),
+ TopoRouter.RD_ISIS,
+ os.path.join(CWD, "{}/isisd.conf".format(rname)),
"-M snmp",
)
router.load_config(
- TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)),
+ TopoRouter.RD_LDP,
+ os.path.join(CWD, "{}/ldpd.conf".format(rname)),
)
router.load_config(
- TopoRouter.RD_SNMP, os.path.join(CWD, "{}/snmpd.conf".format(rname)),
+ TopoRouter.RD_SNMP,
+ os.path.join(CWD, "{}/snmpd.conf".format(rname)),
"-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap",
)
# After loading the configurations, this function loads configured daemons.
tgen.start_router()
+
def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
# This function tears down the whole topology.
tgen.stop_topology()
+
def router_compare_json_output(rname, command, reference):
"Compare router JSON output"
assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
assert diff is None, assertmsg
+
def generate_oid(numoids, index1, index2):
if numoids == 1:
oid = "{}".format(index1)
router_compare_json_output(
rname,
"show yang operational-data /frr-interface:lib isisd",
- "show_yang_interface_isis_adjacencies.ref")
+ "show_yang_interface_isis_adjacencies.ref",
+ )
+
def test_r1_scalar_snmp():
"Wait for protocol convergence"
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
- assert r1_snmp.test_oid('isisSysVersion', "one(1)")
- assert r1_snmp.test_oid('isisSysLevelType', "level1and2(3)")
- assert r1_snmp.test_oid('isisSysID',"00 00 00 00 00 01")
- assert r1_snmp.test_oid('isisSysMaxPathSplits',"32")
- assert r1_snmp.test_oid('isisSysMaxLSPGenInt',"900 seconds")
- assert r1_snmp.test_oid('isisSysAdminState',"on(1)")
- assert r1_snmp.test_oid('isisSysMaxAge',"1200 seconds")
- assert r1_snmp.test_oid('isisSysProtSupported',"07 5 6 7")
+ assert r1_snmp.test_oid("isisSysVersion", "one(1)")
+ assert r1_snmp.test_oid("isisSysLevelType", "level1and2(3)")
+ assert r1_snmp.test_oid("isisSysID", "00 00 00 00 00 01")
+ assert r1_snmp.test_oid("isisSysMaxPathSplits", "32")
+ assert r1_snmp.test_oid("isisSysMaxLSPGenInt", "900 seconds")
+ assert r1_snmp.test_oid("isisSysAdminState", "on(1)")
+ assert r1_snmp.test_oid("isisSysMaxAge", "1200 seconds")
+ assert r1_snmp.test_oid("isisSysProtSupported", "07 5 6 7")
r2 = tgen.net.get("r2")
r2_snmp = SnmpTester(r2, "2.2.2.2", "public", "2c")
-
- assert r2_snmp.test_oid('isisSysVersion', "one(1)")
- assert r2_snmp.test_oid('isisSysLevelType', "level1and2(3)")
- assert r2_snmp.test_oid('isisSysID',"00 00 00 00 00 02")
- assert r2_snmp.test_oid('isisSysMaxPathSplits',"32")
- assert r2_snmp.test_oid('isisSysMaxLSPGenInt',"900 seconds")
- assert r2_snmp.test_oid('isisSysAdminState',"on(1)")
- assert r2_snmp.test_oid('isisSysMaxAge',"1200 seconds")
- assert r2_snmp.test_oid('isisSysProtSupported',"07 5 6 7")
+
+ assert r2_snmp.test_oid("isisSysVersion", "one(1)")
+ assert r2_snmp.test_oid("isisSysLevelType", "level1and2(3)")
+ assert r2_snmp.test_oid("isisSysID", "00 00 00 00 00 02")
+ assert r2_snmp.test_oid("isisSysMaxPathSplits", "32")
+ assert r2_snmp.test_oid("isisSysMaxLSPGenInt", "900 seconds")
+ assert r2_snmp.test_oid("isisSysAdminState", "on(1)")
+ assert r2_snmp.test_oid("isisSysMaxAge", "1200 seconds")
+ assert r2_snmp.test_oid("isisSysProtSupported", "07 5 6 7")
circtable_test = {
"isisCircMeshGroupEnabled": ["inactive(1)", "inactive(1)", "inactive(1)"],
"isisCircSmallHellos": ["false(2)", "false(2)", "false(2)"],
"isisCirc3WayEnabled": ["false(2)", "false(2)", "false(2)"],
- }
+}
+
def test_r1_isisCircTable():
tgen = get_topogen()
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
oids = []
- oids.append(generate_oid(1,1,0))
- oids.append(generate_oid(1,2,0))
- oids.append(generate_oid(1,3,0))
+ oids.append(generate_oid(1, 1, 0))
+ oids.append(generate_oid(1, 2, 0))
+ oids.append(generate_oid(1, 3, 0))
# check items
for item in circtable_test.keys():
)
assert r1_snmp.test_oid_walk(item, circtable_test[item], oids), assertmsg
+
circleveltable_test = {
"isisCircLevelMetric": ["10", "10", "10", "10"],
"isisCircLevelWideMetric": ["10", "10", "0", "0"],
"isisCircLevelISPriority": ["64", "64", "64", "64"],
"isisCircLevelHelloMultiplier": ["10", "10", "10", "10"],
- "isisCircLevelHelloTimer": ["3000 milliseconds", "3000 milliseconds", "3000 milliseconds", "3000 milliseconds"],
- "isisCircLevelMinLSPRetransInt": ["1 seconds", "1 seconds", "0 seconds", "0 seconds"],
- }
+ "isisCircLevelHelloTimer": [
+ "3000 milliseconds",
+ "3000 milliseconds",
+ "3000 milliseconds",
+ "3000 milliseconds",
+ ],
+ "isisCircLevelMinLSPRetransInt": [
+ "1 seconds",
+ "1 seconds",
+ "0 seconds",
+ "0 seconds",
+ ],
+}
+
def test_r1_isislevelCircTable():
tgen = get_topogen()
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
oids = []
- oids.append(generate_oid(2,1,"area"))
- oids.append(generate_oid(2,2,"area"))
- oids.append(generate_oid(2,3,"area"))
- oids.append(generate_oid(2,3,"domain"))
+ oids.append(generate_oid(2, 1, "area"))
+ oids.append(generate_oid(2, 2, "area"))
+ oids.append(generate_oid(2, 3, "area"))
+ oids.append(generate_oid(2, 3, "domain"))
# check items
for item in circleveltable_test.keys():
"isisISAdjNeighPriority": ["64"],
}
+
def test_r1_isisAdjTable():
"check ISIS Adjacency Table"
tgen = get_topogen()
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
oids = []
- oids.append(generate_oid(2,1,1))
- oids.append(generate_oid(2,2,1))
+ oids.append(generate_oid(2, 1, 1))
+ oids.append(generate_oid(2, 2, 1))
oids_down = []
- oids_down.append(generate_oid(2,1,1))
+ oids_down.append(generate_oid(2, 1, 1))
# check items
for item in adjtable_test.keys():
)
assert r1_snmp.test_oid_walk(item, adjtable_test[item], oids), assertmsg
-
# shutdown interface and one adjacency should be removed
"check ISIS adjacency is removed when interface is shutdown"
r1_cmd.vtysh_cmd("conf t\ninterface r1-eth1\nshutdown")
assertmsg = "{} should be {} oids {} full dict {}:".format(
item, adjtable_down_test[item], oids_down, r1_snmp.walk(item)
)
- assert r1_snmp.test_oid_walk(item, adjtable_down_test[item], oids_down), assertmsg
+ assert r1_snmp.test_oid_walk(
+ item, adjtable_down_test[item], oids_down
+ ), assertmsg
# no shutdown interface and adjacency should be restored
r1_cmd.vtysh_cmd("conf t\ninterface r1-eth1\nno shutdown")
from lib.topolog import logger
from lib.topotest import iproute2_is_vrf_capable
from lib.common_config import (
- required_linux_kernel_version,
- adjust_router_l3mdev,
+ required_linux_kernel_version,
+ adjust_router_l3mdev,
)
from mininet.topo import Topo
pytestmark = [pytest.mark.isisd]
VERTEX_TYPE_LIST = [
- "pseudo_IS",
- "pseudo_TE-IS",
- "IS",
- "TE-IS",
- "ES",
- "IP internal",
- "IP external",
- "IP TE",
- "IP6 internal",
- "IP6 external",
- "UNKNOWN"
- ]
+ "pseudo_IS",
+ "pseudo_TE-IS",
+ "IS",
+ "TE-IS",
+ "ES",
+ "IP internal",
+ "IP external",
+ "IP TE",
+ "IP6 internal",
+ "IP6 external",
+ "UNKNOWN",
+]
+
class ISISTopo1(Topo):
"Simple two layer ISIS vrf topology"
ipv = "ipv4"
continue
- item_match = re.match(r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)"
- , line)
+ item_match = re.match(
+ r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
+ )
if (
item_match is not None
and item_match.group(1) == "Vertex"
# Skip header
continue
- item_match = re.match(r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)"
- .format(vertex_type_regex), line)
+ item_match = re.match(
+ r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
+ vertex_type_regex
+ ),
+ line,
+ )
if item_match is not None:
areas[area][level][ipv].append(
{
)
continue
- item_match = re.match(r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)"
- .format(vertex_type_regex), line)
+ item_match = re.match(
+ r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
+ line,
+ )
if item_match is not None:
areas[area][level][ipv].append(
TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)),
- "-M snmp"
+ TopoRouter.RD_LDP,
+ os.path.join(CWD, "{}/ldpd.conf".format(rname)),
+ "-M snmp",
)
router.load_config(
- TopoRouter.RD_SNMP, os.path.join(CWD, "{}/snmpd.conf".format(rname)),
- "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap"
+ TopoRouter.RD_SNMP,
+ os.path.join(CWD, "{}/snmpd.conf".format(rname)),
+ "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap",
)
-
tgen.start_router()
# Skip if previous fatal error condition is raised
# TODO: disabling this check to avoid 'snmpd not running' errors
- #if tgen.routers_have_failure():
+ # if tgen.routers_have_failure():
# pytest.skip(tgen.errors)
for rname in ["r1", "r2", "r3"]:
# Skip if previous fatal error condition is raised
# TODO: disabling this check to avoid 'snmpd not running' errors
- #if tgen.routers_have_failure():
+ # if tgen.routers_have_failure():
# pytest.skip(tgen.errors)
for rname in ["r1", "r2", "r3"]:
tgen = get_topogen()
# Skip if previous fatal error condition is raised
- #if tgen.routers_have_failure():
+ # if tgen.routers_have_failure():
# pytest.skip(tgen.errors)
for rname in ["r1", "r2", "r3"]:
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
- assert r1_snmp.test_oid('mplsLdpLsrId', "01 01 01 01")
- assert r1_snmp.test_oid('mplsLdpLsrLoopDetectionCapable', 'none(1)')
+ assert r1_snmp.test_oid("mplsLdpLsrId", "01 01 01 01")
+ assert r1_snmp.test_oid("mplsLdpLsrLoopDetectionCapable", "none(1)")
def test_r1_ldp_entity_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityLdpId', ['1.1.1.1:0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityIndex', ['1'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityProtocolVersion', ['1'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityAdminStatus', ['enable(1)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityOperStatus', ['enabled(2)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityTcpPort', ['646'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityUdpDscPort', ['646'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityMaxPduLength', ['4096 octets'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityKeepAliveHoldTimer', ['180 seconds'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityHelloHoldTimer', ['0 seconds'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityInitSessionThreshold', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityLabelDistMethod', ['downstreamUnsolicited(2)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityLabelRetentionMode', ['liberal(2)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityPathVectorLimit', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityHopCountLimit', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityTransportAddrKind', ['loopback(2)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityTargetPeer', ['true(1)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityTargetPeerAddrType', ['ipv4(1)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityTargetPeerAddr', ['01 01 01 01'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityLabelType', ['generic(1)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityDiscontinuityTime', ['(0) 0:00:00.00'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStorageType', ['nonVolatile(3)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityRowStatus', ['createAndGo(4)'])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityLdpId", ["1.1.1.1:0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityIndex", ["1"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityProtocolVersion", ["1"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityAdminStatus", ["enable(1)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityOperStatus", ["enabled(2)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityTcpPort", ["646"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityUdpDscPort", ["646"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityMaxPduLength", ["4096 octets"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityKeepAliveHoldTimer", ["180 seconds"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityHelloHoldTimer", ["0 seconds"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityInitSessionThreshold", ["0"])
+ assert r1_snmp.test_oid_walk(
+ "mplsLdpEntityLabelDistMethod", ["downstreamUnsolicited(2)"]
+ )
+ assert r1_snmp.test_oid_walk("mplsLdpEntityLabelRetentionMode", ["liberal(2)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityPathVectorLimit", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityHopCountLimit", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityTransportAddrKind", ["loopback(2)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeer", ["true(1)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeerAddrType", ["ipv4(1)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeerAddr", ["01 01 01 01"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityLabelType", ["generic(1)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityDiscontinuityTime", ["(0) 0:00:00.00"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStorageType", ["nonVolatile(3)"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityRowStatus", ["createAndGo(4)"])
def test_r1_ldp_entity_stats_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionAttempts", ["0"])
assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsSessionAttempts', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsSessionRejectedNoHelloErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsSessionRejectedAdErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsSessionRejectedMaxPduErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsSessionRejectedLRErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsBadLdpIdentifierErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsBadPduLengthErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsBadMessageLengthErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsBadTlvLengthErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsMalformedTlvValueErrors', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsKeepAliveTimerExpErrors', ['0'])
+ "mplsLdpEntityStatsSessionRejectedNoHelloErrors", ["0"]
+ )
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedAdErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedMaxPduErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedLRErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadLdpIdentifierErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadPduLengthErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadMessageLengthErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadTlvLengthErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsMalformedTlvValueErrors", ["0"])
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsKeepAliveTimerExpErrors", ["0"])
assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsShutdownReceivedNotifications', ['0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpEntityStatsShutdownSentNotifications', ['0'])
+ "mplsLdpEntityStatsShutdownReceivedNotifications", ["0"]
+ )
+ assert r1_snmp.test_oid_walk("mplsLdpEntityStatsShutdownSentNotifications", ["0"])
def test_r1_ldp_peer_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
+ assert r1_snmp.test_oid_walk("mplsLdpPeerLdpId", ["2.2.2.2:0", "3.3.3.3:0"])
assert r1_snmp.test_oid_walk(
- 'mplsLdpPeerLdpId', ['2.2.2.2:0', '3.3.3.3:0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpPeerLabelDistMethod',
- ['downstreamUnsolicited(2)', 'downstreamUnsolicited(2)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpPeerPathVectorLimit', ['0', '0'])
+ "mplsLdpPeerLabelDistMethod",
+ ["downstreamUnsolicited(2)", "downstreamUnsolicited(2)"],
+ )
+ assert r1_snmp.test_oid_walk("mplsLdpPeerPathVectorLimit", ["0", "0"])
+ assert r1_snmp.test_oid_walk("mplsLdpPeerTransportAddrType", ["ipv4(1)", "ipv4(1)"])
assert r1_snmp.test_oid_walk(
- 'mplsLdpPeerTransportAddrType', ['ipv4(1)', 'ipv4(1)'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpPeerTransportAddr', ['02 02 02 02', '03 03 03 03'])
+ "mplsLdpPeerTransportAddr", ["02 02 02 02", "03 03 03 03"]
+ )
def test_r1_ldp_session_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
- assert r1_snmp.test_oid_walk('mplsLdpSessionState',
- ['operational(5)', 'operational(5)'])
- assert r1_snmp.test_oid_walk('mplsLdpSessionRole',
- ['passive(3)', 'passive(3)'])
- assert r1_snmp.test_oid_walk('mplsLdpSessionProtocolVersion',
- ['1', '1'])
- assert r1_snmp.test_oid_walk('mplsLdpSessionKeepAliveTime',
- ['180 seconds', '180 seconds'])
- assert r1_snmp.test_oid_walk('mplsLdpSessionMaxPduLength',
- ['4096 octets', '4096 octets'])
- assert r1_snmp.test_oid_walk('mplsLdpSessionDiscontinuityTime',
- ['(0) 0:00:00.00', '(0) 0:00:00.00'])
+ assert r1_snmp.test_oid_walk(
+ "mplsLdpSessionState", ["operational(5)", "operational(5)"]
+ )
+ assert r1_snmp.test_oid_walk("mplsLdpSessionRole", ["passive(3)", "passive(3)"])
+ assert r1_snmp.test_oid_walk("mplsLdpSessionProtocolVersion", ["1", "1"])
+ assert r1_snmp.test_oid_walk(
+ "mplsLdpSessionKeepAliveTime", ["180 seconds", "180 seconds"]
+ )
+ assert r1_snmp.test_oid_walk(
+ "mplsLdpSessionMaxPduLength", ["4096 octets", "4096 octets"]
+ )
+ assert r1_snmp.test_oid_walk(
+ "mplsLdpSessionDiscontinuityTime", ["(0) 0:00:00.00", "(0) 0:00:00.00"]
+ )
def test_r1_ldp_session_stats_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
- assert r1_snmp.test_oid_walk(
- 'mplsLdpSessionStatsUnknownMesTypeErrors', ['0', '0'])
- assert r1_snmp.test_oid_walk(
- 'mplsLdpSessionStatsUnknownTlvErrors', ['0', '0'])
+ assert r1_snmp.test_oid_walk("mplsLdpSessionStatsUnknownMesTypeErrors", ["0", "0"])
+ assert r1_snmp.test_oid_walk("mplsLdpSessionStatsUnknownTlvErrors", ["0", "0"])
def test_r1_ldp_hello_adjacency_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
- assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyIndex',
- ['1', '2', '1'])
- assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyHoldTime',
- ['15', '45', '15'])
- assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyType',
- ['link(1)', 'targeted(2)', 'link(1)'])
+ assert r1_snmp.test_oid_walk("mplsLdpHelloAdjacencyIndex", ["1", "2", "1"])
+ assert r1_snmp.test_oid_walk("mplsLdpHelloAdjacencyHoldTime", ["15", "45", "15"])
+ assert r1_snmp.test_oid_walk(
+ "mplsLdpHelloAdjacencyType", ["link(1)", "targeted(2)", "link(1)"]
+ )
# Memory leak test template
run_frr_cmd,
FRRCFG_FILE,
retry,
- get_ipv6_linklocal_address
+ get_ipv6_linklocal_address,
)
LOGDIR = "/tmp/topotests/"
# Debug logs for daemons
DEBUG_LOGS = {
"pimd": [
- 'debug msdp events',
- 'debug msdp packets',
- 'debug igmp events',
- 'debug igmp trace',
- 'debug mroute',
- 'debug mroute detail',
- 'debug pim events',
- 'debug pim packets',
- 'debug pim trace',
- 'debug pim zebra',
- 'debug pim bsm',
- 'debug pim packets joins',
- 'debug pim packets register',
- 'debug pim nht'
+ "debug msdp events",
+ "debug msdp packets",
+ "debug igmp events",
+ "debug igmp trace",
+ "debug mroute",
+ "debug mroute detail",
+ "debug pim events",
+ "debug pim packets",
+ "debug pim trace",
+ "debug pim zebra",
+ "debug pim bsm",
+ "debug pim packets joins",
+ "debug pim packets register",
+ "debug pim nht",
],
"bgpd": [
- 'debug bgp neighbor-events',
- 'debug bgp updates',
- 'debug bgp zebra',
- 'debug bgp nht',
- 'debug bgp neighbor-events',
- 'debug bgp graceful-restart',
- 'debug bgp update-groups',
- 'debug bgp vpn leak-from-vrf',
- 'debug bgp vpn leak-to-vrf',
- 'debug bgp zebr',
- 'debug bgp updates',
- 'debug bgp nht',
- 'debug bgp neighbor-events',
- 'debug vrf'
+ "debug bgp neighbor-events",
+ "debug bgp updates",
+ "debug bgp zebra",
+ "debug bgp nht",
+ "debug bgp neighbor-events",
+ "debug bgp graceful-restart",
+ "debug bgp update-groups",
+ "debug bgp vpn leak-from-vrf",
+ "debug bgp vpn leak-to-vrf",
+ "debug bgp zebr",
+ "debug bgp updates",
+ "debug bgp nht",
+ "debug bgp neighbor-events",
+ "debug vrf",
],
"zebra": [
- 'debug zebra events',
- 'debug zebra rib',
- 'debug zebra vxlan',
- 'debug zebra nht'
+ "debug zebra events",
+ "debug zebra rib",
+ "debug zebra vxlan",
+ "debug zebra nht",
],
"ospf": [
- 'debug ospf event',
- 'debug ospf ism',
- 'debug ospf lsa',
- 'debug ospf nsm',
- 'debug ospf nssa',
- 'debug ospf packet all',
- 'debug ospf sr',
- 'debug ospf te',
- 'debug ospf zebra',
- ]
+ "debug ospf event",
+ "debug ospf ism",
+ "debug ospf lsa",
+ "debug ospf nsm",
+ "debug ospf nssa",
+ "debug ospf packet all",
+ "debug ospf sr",
+ "debug ospf te",
+ "debug ospf zebra",
+ ],
}
if config.has_option("topogen", "verbosity"):
log_file = debug_dict.setdefault("log_file", None)
if log_file:
- _log_file = os.path.join(LOGDIR, tgen.modname,
- log_file)
- debug_config.append("log file {} \n".\
- format(_log_file))
+ _log_file = os.path.join(LOGDIR, tgen.modname, log_file)
+ debug_config.append("log file {} \n".format(_log_file))
if type(enable_logs) is list:
for daemon in enable_logs:
for debug_log in debug_logs:
debug_config.append("no {}".format(debug_log))
- result = create_common_configuration(tgen, router,
- debug_config,
- "debug_log_config",
- build=build)
+ result = create_common_configuration(
+ tgen, router, debug_config, "debug_log_config", build=build
+ )
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
"""
tgen = get_topogen()
ext_nh = tgen.net[node].get_ipv6_linklocal()
- req_nh = topo[node]['links'][intf]['interface']
+ req_nh = topo[node]["links"][intf]["interface"]
llip = None
for llips in ext_nh:
if llips[0] == req_nh:
logger.info("Link local ip found = %s", llip)
return llip
- errormsg = "Failed: Link local ip not found on router {}, "\
- "interface {}".format(node, intf)
+ errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
+ node, intf
+ )
return errormsg
)
output = tgen.net[router].cmd("sysctl -n net.ipv4.tcp_l3mdev_accept")
- logger.info(
- "router {0}: existing tcp_l3mdev_accept was {1}".format(router, output)
- )
+ logger.info("router {0}: existing tcp_l3mdev_accept was {1}".format(router, output))
tgen.net[router].cmd(
"sysctl -w net.ipv4.tcp_l3mdev_accept={}".format(l3mdev_accept)
for lnk in input_dict[router]["links"].keys():
if "ospf" not in input_dict[router]["links"][lnk]:
logger.debug(
- "Router %s: ospf config is not present in"
- "input_dict",
- router
+ "Router %s: ospf config is not present in" "input_dict", router
)
continue
ospf_data = input_dict[router]["links"][lnk]["ospf"]
g_extra_config = {}
+
def gdb_core(obj, daemon, corefiles):
gdbcmds = """
info threads
["ip", "route", "show", "vrf"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- stdin=subprocess.PIPE
+ stdin=subprocess.PIPE,
)
iproute2_err = subp.communicate()[1].splitlines()[0].split()[0]
# Run a command in a new window (gnome-terminal, screen, tmux, xterm)
def runInWindow(self, cmd, title=None):
topo_terminal = os.getenv("FRR_TOPO_TERMINAL")
- if topo_terminal or (
- "TMUX" not in os.environ and "STY" not in os.environ
- ):
+ if topo_terminal or ("TMUX" not in os.environ and "STY" not in os.environ):
term = topo_terminal if topo_terminal else "xterm"
- makeTerm(
- self,
- title=title if title else cmd,
- term=term,
- cmd=cmd)
+ makeTerm(self, title=title if title else cmd, term=term, cmd=cmd)
else:
nscmd = "sudo nsenter -m -n -t {} {}".format(self.pid, cmd)
if "TMUX" in os.environ:
cmd = "{} {}".format(wcmd, nscmd)
elif "STY" in os.environ:
if os.path.exists(
- "/run/screen/S-{}/{}".format(
- os.environ['USER'], os.environ['STY']
- )
+ "/run/screen/S-{}/{}".format(os.environ["USER"], os.environ["STY"])
):
wcmd = "screen"
else:
cmd = "{} {}".format(wcmd, nscmd)
self.cmd(cmd)
-
def startRouter(self, tgen=None):
# Disable integrated-vtysh-config
self.cmd(
def startRouterDaemons(self, daemons=None):
"Starts all FRR daemons for this router."
- gdb_breakpoints = g_extra_config["gdb_breakpoints"]
+ gdb_breakpoints = g_extra_config["gdb_breakpoints"]
gdb_daemons = g_extra_config["gdb_daemons"]
gdb_routers = g_extra_config["gdb_routers"]
if (
(gdb_routers or gdb_daemons)
- and (not gdb_routers
- or self.name in gdb_routers
- or "all" in gdb_routers)
- and (not gdb_daemons
- or daemon in gdb_daemons
- or "all" in gdb_daemons)
+ and (
+ not gdb_routers or self.name in gdb_routers or "all" in gdb_routers
+ )
+ and (not gdb_daemons or daemon in gdb_daemons or "all" in gdb_daemons)
):
if daemon == "snmpd":
cmdopt += " -f "
self.cmd(" ".join([cmdenv, binary, cmdopt]))
logger.info("{}: {} {} started".format(self, self.routertype, daemon))
-
# Start Zebra first
if "zebra" in daemons_list:
start_daemon("zebra", "-s 90000000")
input_dict = {
"i1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": next_hop_rp}]},
"l1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": next_hop_lhr}]},
- "f1": {"static_routes": [{"network": CRP, "next_hop": next_hop_fhr, "delete": True}]},
+ "f1": {
+ "static_routes": [
+ {"network": CRP, "next_hop": next_hop_fhr, "delete": True}
+ ]
+ },
}
result = create_static_routes(tgen, input_dict)
step("Verify if b1 chosen as BSR in l1")
result = verify_pim_bsr(tgen, topo, "l1", BSR_IP_1, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "b1 is not chosen as BSR in l1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "b1 is not chosen as BSR in l1 \n Error: {}".format(
tc_name, result
- ))
+ )
state_after = verify_pim_interface_traffic(tgen, state_dict)
assert isinstance(
# Verify bsr state in l1
step("Verify no BSR in l1 as i1 would not forward the no-forward bsm")
result = verify_pim_bsr(tgen, topo, "l1", bsr_ip, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"BSR data is present after no-forward bsm also \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
# unconfigure unicast bsm on f1-i1-eth2
step("unconfigure unicast bsm on f1-i1-eth2, will forward with only mcast")
result = verify_ip_mroutes(
tgen, "i1", src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "Mroutes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Mroutes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
# unconfigure bsm processing on f1 on f1-i1-eth2
step("unconfigure bsm processing on f1 in f1-i1-eth2, will drop bsm")
# Verify bsr state in i1
step("Verify if b1 is not chosen as BSR in i1")
result = verify_pim_bsr(tgen, topo, "i1", bsr_ip, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "b1 is chosen as BSR in i1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "b1 is chosen as BSR in i1 \n Error: {}".format(
tc_name, result
- ))
+ )
# check if mroute still not installed because of rp not available
step("check if mroute still not installed because of rp not available")
result = verify_ip_mroutes(
tgen, "i1", src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "mroute installed but rp not available \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "mroute installed but rp not available \n Error: {}".format(tc_name, result)
+ )
# configure bsm processing on i1 on f1-i1-eth2
step("configure bsm processing on f1 in f1-i1-eth2, will accept bsm")
tgen, topo, "f1", group, rp_source="BSR", expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "bsr has not aged out in f1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "bsr has not aged out in f1 \n Error: {}".format(
tc_name, result
- ))
+ )
# Verify RP mapping removed after hold timer expires
group = "225.1.1.1/32"
result = verify_join_state_and_timer(
tgen, dut, iif, src_addr, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"join state is up and join timer is running in l1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
# Verify ip mroute is not installed
step("Verify mroute not installed in l1")
result = verify_ip_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "mroute installed in l1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
tc_name, result
- ))
+ )
step("clear BSM database before moving to next case")
clear_bsrp_data(tgen, topo)
result = verify_ip_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "mroute installed in l1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
tc_name, result
- ))
+ )
# Add back route for RP to make it reachable
step("Add back route for RP to make it reachable")
result = verify_ip_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, wait=20, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "mroute installed in l1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
tc_name, result
- ))
+ )
# Send BSM again to configure rp
step("Add back RP by sending BSM from b1")
# Verify bsr state in FHR
step("Verify if b2 is not chosen as bsr in f1")
result = verify_pim_bsr(tgen, topo, "f1", bsr_ip2, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "b2 is chosen as bsr in f1 \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "b2 is chosen as bsr in f1 \n Error: {}".format(
tc_name, result
- ))
+ )
# Verify if b1 is still chosen as bsr
step("Verify if b1 is still chosen as bsr in f1")
IGMP_JOIN_RANGE_1,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"upstream is still present after shut the link from "
- "FHR to RP from RP node \n Error: {}".format(
- tc_name, result
- ))
+ "FHR to RP from RP node \n Error: {}".format(tc_name, result)
+ )
step(" No shut the link from FHR to RP from RP node")
IGMP_JOIN_RANGE_1,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"upstream is still present after shut the link from "
- "FHR to RP from FHR node \n Error: {}".format(
- tc_name, result
- ))
+ "FHR to RP from FHR node \n Error: {}".format(tc_name, result)
+ )
step(" No shut the link from FHR to RP from FHR node")
data["oil"],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"mroutes(S,G) are present after delete of static routes on c1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
result = verify_upstream_iif(
tgen,
IGMP_JOIN_RANGE_1,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"upstream is present after delete of static routes on c1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
for data in input_dict_starg:
result = verify_ip_mroutes(
data["oil"],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"mroutes(*,G) are present after delete of static routes on c1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
result = verify_upstream_iif(
tgen,
IGMP_JOIN_RANGE_1,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"upstream is present after delete of static routes on c1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("Configure default routes on c2")
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"RP info is unknown after removing static route from c2 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("Verify (s,g) populated after adding default route ")
data["oil"],
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "mroutes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_upstream_iif(
tgen,
IGMP_JOIN_RANGE_1,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "upstream is still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "upstream is still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Configure default routes on all the nodes")
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"RP info is unknown after removing static route from c2 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("Verify (s,g) populated after adding default route ")
dut = "r1"
interface = "r1-r0-eth0"
result = verify_igmp_groups(tgen, dut, interface, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: igmp group present without any IGMP join \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r1: Verify show ip pim interface traffic without any IGMP join")
state_dict = {"r1": {"r1-r2-eth1": ["pruneTx"]}}
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_ALL, iif, rp_address, SOURCE, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: RP info present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: RP info present \n Error: {}".format(
tc_name, result
- ))
+ )
step("r1: Verify upstream IIF interface")
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: upstream IIF interface present \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: upstream IIF interface present \n Error: {}".format(tc_name, result)
+ )
step("r1: Verify upstream join state and join timer")
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: upstream join state is up and join timer is running \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r1: Verify PIM state")
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
step("r1: Verify ip mroutes")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: mroutes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: mroutes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("r1: Verify show ip pim interface traffic without any IGMP join")
state_after = verify_pim_interface_traffic(tgen, state_dict)
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S, G) upstream join state is up and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r2-eth1"
"using show ip pim state"
)
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"OIL is not same and IIF is not cleared on R1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r1: upstream IIF should be unknown , verify using show ip pim" "upstream")
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: upstream IIF is not unknown \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: upstream IIF is not unknown \n Error: {}".format(tc_name, result)
+ )
step(
"r1: join state should not be joined and join timer should stop,"
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: join state is joined and timer is not stopped \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step(
"r1: (*,G) prune is sent towards the RP interface, verify using"
step("r1: (*, G) cleared from mroute table using show ip mroute")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: (*, G) are not cleared from mroute table \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
logger.info("Expected behavior: {}".format(result))
# Uncomment next line for debugging
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_ALL, iif, rp_address, SOURCE, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: rp-info is present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: rp-info is present \n Error: {}".format(
tc_name, result
- ))
+ )
step("joinTx value before join sent")
state_dict = {"r1": {"r1-r2-eth1": ["joinTx"]}}
step("r1: Verify upstream IIF interface")
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: upstream IFF interface is present \n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: upstream IFF interface is present \n Error: {}".format(tc_name, result)
+ )
step("r1: Verify upstream join state and join timer")
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: upstream join state is joined and timer is running \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r1: Verify PIM state")
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: PIM state is up\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: PIM state is up\n Error: {}".format(
tc_name, result
- ))
+ )
step("r1: Verify ip mroutes")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: mroutes are still present\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: mroutes are still present\n Error: {}".format(
tc_name, result
- ))
+ )
step("r1: Configure static RP")
input_dict = {
step("r1 : Verify upstream IIF interface")
iif = "r1-r2-eth1"
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: upstream IIF interface is present\n Error: {}".format(
- tc_name, result
- ))
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "r1: upstream IIF interface is present\n Error: {}".format(tc_name, result)
+ )
step("r1 : Verify upstream join state and join timer")
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: upstream join state is joined and timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r1 : Verify PIM state")
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: PIM state is up\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: PIM state is up\n Error: {}".format(
tc_name, result
- ))
+ )
step("r1 : Verify ip mroutes")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: mroutes are still present\n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: mroutes are still present\n Error: {}".format(
tc_name, result
- ))
+ )
step("r1: Make RP reachable")
intf = "r1-r2-eth1"
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE, oif, rp_address_2, SOURCE, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: rp-info is present for group 225.1.1.1 \n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step(
"r1 : Verify RPF interface updated in mroute when higher preferred"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S, G) upstream join state is joined and join"
- " timer is running \n Error: {}".format(
- tc_name, result
- ))
+ " timer is running \n Error: {}".format(tc_name, result)
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r4: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r4: Verify (S, G) ip mroutes")
oif = "none"
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
iif = "r1-r3-eth2"
oif = "r1-r0-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (*, G) ip mroutes")
dut = "r2"
iif = "lo"
oif = "r2-r3-eth1"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: Verify (*, G) ip mroutes")
dut = "r3"
iif = "r3-r2-eth1"
oif = "r3-r1-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r3: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r3: No shutdown the link from R1 to R3 from R3 node")
dut = "r3"
iif = "r1-r2-eth1"
oif = "r1-r0-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (*, G) ip mroutes cleared")
dut = "r2"
iif = "lo"
oif = "r2-r1-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
write_test_footer(tc_name)
iif = "r1-r2-eth1"
oif = "r1-r0-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: (*,G) mroutes are not cleared after shut of R1 to R2 and R3 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
step("r2: Verify (*, G) ip mroutes cleared")
dut = "r2"
iif = "lo"
oif = "r2-r1-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r2: (*,G) mroutes are not cleared after shut of R1 to R2 and R3 link\n Error: {}".format(
- tc_name, result
- ))
+ tc_name, result
+ )
+ )
write_test_footer(tc_name)
pytestmark = [pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
pytest.skip(tgen.errors)
logger.info("Disabling SR on rt6")
- tgen.net["rt6"].cmd(
- 'vtysh -c "conf t" -c "router ospf" -c "no segment-routing on"'
- )
+ tgen.net["rt6"].cmd('vtysh -c "conf t" -c "router ospf" -c "no segment-routing on"')
for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
router_compare_json_output(
# Expected changes:
# -All commands should be rejected
#
-#def test_ospf_invalid_config_step11():
+# def test_ospf_invalid_config_step11():
# logger.info("Test (step 11): check if invalid configuration is rejected")
# tgen = get_topogen()
#
pytestmark = [pytest.mark.ospfd]
+
class OspfTeTopo(Topo):
"Test topology builder"
# Note that all routers must discover the same Network Topology, so the same TED.
+
def test_step1():
"Step1: Check initial topology"
tgen = setup_testcase("Step2: Shutdown interface between r1 & r2")
- tgen.net["r1"].cmd(
- 'vtysh -c "conf t" -c "interface r1-eth1" -c "shutdown"'
- )
- tgen.net["r2"].cmd(
- 'vtysh -c "conf t" -c "interface r2-eth1" -c "shutdown"'
- )
+ tgen.net["r1"].cmd('vtysh -c "conf t" -c "interface r1-eth1" -c "shutdown"')
+ tgen.net["r2"].cmd('vtysh -c "conf t" -c "interface r2-eth1" -c "shutdown"')
for rname in ["r1", "r2", "r3", "r4"]:
compare_ted_json_output(tgen, rname, "ted_step2.json")
tgen = setup_testcase("Step3: Disable Inter-AS on r3")
- tgen.net["r3"].cmd(
- 'vtysh -c "conf t" -c "router ospf" -c "no mpls-te inter-as"'
- )
+ tgen.net["r3"].cmd('vtysh -c "conf t" -c "router ospf" -c "no mpls-te inter-as"')
for rname in ["r1", "r2", "r3", "r4"]:
compare_ted_json_output(tgen, rname, "ted_step3.json")
tgen = setup_testcase("Step4: Enable Segment Routing on r1 & r2")
- tgen.net["r1"].cmd(
- 'vtysh -c "conf t" -c "router ospf" -c "segment-routing on"'
- )
+ tgen.net["r1"].cmd('vtysh -c "conf t" -c "router ospf" -c "segment-routing on"')
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing global-block 20000 23999"'
)
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing prefix 10.0.255.1/32 index 10"'
)
- tgen.net["r2"].cmd(
- 'vtysh -c "conf t" -c "router ospf" -c "segment-routing on"'
- )
+ tgen.net["r2"].cmd('vtysh -c "conf t" -c "router ospf" -c "segment-routing on"')
tgen.net["r2"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing node-msd 16"'
)
tgen = setup_testcase("Step5: Re-enable interface between r1 & r2")
- tgen.net["r1"].cmd(
- 'vtysh -c "conf t" -c "interface r1-eth1" -c "no shutdown"'
- )
- tgen.net["r2"].cmd(
- 'vtysh -c "conf t" -c "interface r2-eth1" -c "no shutdown"'
- )
+ tgen.net["r1"].cmd('vtysh -c "conf t" -c "interface r1-eth1" -c "no shutdown"')
+ tgen.net["r2"].cmd('vtysh -c "conf t" -c "interface r2-eth1" -c "no shutdown"')
for rname in ["r1", "r2", "r3", "r4"]:
compare_ted_json_output(tgen, rname, "ted_step5.json")
tgen = setup_testcase("Step7: Disable OSPF on r4")
- tgen.net["r4"].cmd(
- 'vtysh -c "conf t" -c "no router ospf"'
- )
+ tgen.net["r4"].cmd('vtysh -c "conf t" -c "no router ospf"')
for rname in ["r1", "r2", "r3"]:
compare_ted_json_output(tgen, rname, "ted_step7.json")
* Full/DROther
* Full/Backup
"""
- result = tgen.gears[router].vtysh_cmd('show ip ospf neighbor json',
- isjson=True)
- if topotest.json_cmp(result, {"neighbors": {neighbor: [
- {"state": "Full/DR"}]}}) is None:
+ result = tgen.gears[router].vtysh_cmd(
+ "show ip ospf neighbor json", isjson=True
+ )
+ if (
+ topotest.json_cmp(
+ result, {"neighbors": {neighbor: [{"state": "Full/DR"}]}}
+ )
+ is None
+ ):
return None
- if topotest.json_cmp(result, {"neighbors": {neighbor: [
- {"state": "Full/DROther"}]}}) is None:
+ if (
+ topotest.json_cmp(
+ result, {"neighbors": {neighbor: [{"state": "Full/DROther"}]}}
+ )
+ is None
+ ):
return None
- return topotest.json_cmp(result, {"neighbors": {neighbor: [
- {"state": "Full/Backup"}]}})
+ return topotest.json_cmp(
+ result, {"neighbors": {neighbor: [{"state": "Full/Backup"}]}}
+ )
- _, result = topotest.run_and_expect(run_command_and_expect, None,
- count=130, wait=1)
+ _, result = topotest.run_and_expect(
+ run_command_and_expect, None, count=130, wait=1
+ )
assertmsg = '"{}" convergence failure'.format(router)
assert result is None, assertmsg
-
def expect_ospfv3_neighbor_full(router, neighbor):
"Wait until OSPFv3 convergence."
logger.info("waiting OSPFv3 router '{}'".format(router))
topotest.router_json_cmp,
tgen.gears[router],
"show ipv6 ospf6 database inter-prefix detail json",
- {"areaScopedLinkStateDb": [{
- "areaId": area,
- "lsa": [{
- "prefix": prefix,
- "metric": metric,
- }]}]},
+ {
+ "areaScopedLinkStateDb": [
+ {
+ "areaId": area,
+ "lsa": [
+ {
+ "prefix": prefix,
+ "metric": metric,
+ }
+ ],
+ }
+ ]
+ },
)
_, result = topotest.run_and_expect(test_func, None, count=4, wait=1)
assertmsg = '"{}" convergence failure'.format(router)
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Bring up OSPFd daemon on R0.")
start_router_daemons(tgen, "r0", ["ospfd"])
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Bring up staticd daemon on R0.")
start_router_daemons(tgen, "r0", ["staticd"])
shutdown_bringup_interface(tgen, dut, intf, False)
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
protocol = "ospf"
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
for intfr in range(1, 7):
intf = topo["routers"]["r1"]["links"]["r0-link{}".format(intfr)]["interface"]
result = verify_ospf_rib(
tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
protocol = "ospf"
result = verify_rib(
attempts=5,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Re configure the static route in R0.")
dut = "r0"
result = verify_ospf_rib(
tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
protocol = "ospf"
result = verify_rib(
attempts=5,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Reconfigure the static route in R0.Change ECMP value to 2.")
dut = "r0"
result = verify_ospf_rib(
tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
protocol = "ospf"
result = verify_rib(
attempts=5,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
shutdown_bringup_interface(tgen, dut, intf, False)
result = verify_ospf_neighbor(tgen, topo, dut, lan=True, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r0: OSPF neighbors-hip is up \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r0: OSPF neighbors-hip is up \n Error: {}".format(
tc_name, result
- ))
+ )
step("No Shut interface on R0")
dut = "r0"
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are present in fib \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are present in fib \n Error: {}".format(
tc_name, result
- ))
+ )
step("Delete and reconfigure prefix list.")
# Create ip prefix list
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
pfx_list = {
"r0": {
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, attempts=2, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, attempts=2, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step(
"configure the route map with the same name that is used "
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
# Create route map
routemaps = {"r0": {"route_maps": {"rmap_ipv4": [{"action": "deny"}]}}}
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
step("Delete the route map.")
# Create route map
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: OSPF routes are present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
- ))
+ )
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
- assert result is not True, ("Testcase {} : Failed \n "
- "r1: routes are still present \n Error: {}".format(
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
- ))
+ )
write_test_footer(tc_name)
if result is not True:
break
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: OSPF routes are present after deleting ip address of newly "
- "configured interface of R0 \n Error: {}".format(
- tc_name, result
- ))
+ "configured interface of R0 \n Error: {}".format(tc_name, result)
+ )
protocol = "ospf"
result = verify_rib(
attempts=5,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: OSPF routes are present in fib after deleting ip address of newly "
- "configured interface of R0 \n Error: {}".format(
- tc_name, result
- ))
+ "configured interface of R0 \n Error: {}".format(tc_name, result)
+ )
step("Add back the deleted ip address on newly configured interface of R0")
topo1 = {
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh, expected=False)
if result is not True:
break
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: OSPF routes are present after deleting ip address of newly "
- "configured loopback of R0 \n Error: {}".format(
- tc_name, result
- ))
+ "configured loopback of R0 \n Error: {}".format(tc_name, result)
+ )
protocol = "ospf"
result = verify_rib(
next_hop=nh,
expected=False,
)
- assert result is not True, ("Testcase {} : Failed \n "
+ assert result is not True, (
+ "Testcase {} : Failed \n "
"r1: OSPF routes are present in fib after deleting ip address of newly "
- "configured loopback of R0 \n Error: {}".format(
- tc_name, result
- ))
+ "configured loopback of R0 \n Error: {}".format(tc_name, result)
+ )
step("Add back the deleted ip address on newly configured interface of R0")
topo1 = {
tgen.start_router()
+
def teardown_module(_mod):
"Teardown the pytest environment"
tgen = get_topogen()
tgen.stop_topology()
+
def test_converge_protocols():
"Wait for protocol convergence"
topotest.sleep(10, "Waiting for OSPF convergence")
+
def ospf_configure_suppress_fa(router_name, area):
"Configure OSPF suppress-fa in router_name"
tgen = get_topogen()
router = tgen.gears[router_name]
- router.vtysh_cmd("conf t\nrouter ospf\narea {} nssa suppress-fa\nexit\n".format(area))
+ router.vtysh_cmd(
+ "conf t\nrouter ospf\narea {} nssa suppress-fa\nexit\n".format(area)
+ )
+
def ospf_unconfigure_suppress_fa(router_name, area):
"Remove OSPF suppress-fa in router_name"
tgen = get_topogen()
router = tgen.gears[router_name]
- router.vtysh_cmd("conf t\nrouter ospf\nno area {} nssa suppress-fa\nexit\n".format(area))
+ router.vtysh_cmd(
+ "conf t\nrouter ospf\nno area {} nssa suppress-fa\nexit\n".format(area)
+ )
+
def ospf_get_lsa_type5(router_name):
"Return a dict with link state id as key and forwarding addresses as value"
result[lsa] = re1.group(1)
return result
-@pytest.fixture(scope='module', name='original')
+
+@pytest.fixture(scope="module", name="original")
def test_ospf_set_suppress_fa():
"Test OSPF area [x] nssa suppress-fa"
# in the test_ospf_unset_supress_fa
return initial
+
def test_ospf_unset_supress_fa(original):
"Test OSPF no area [x] nssa suppress-fa"
result4 = verify_rib(
tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
)
- assert result4 is not True, ("Testcase {} : Failed \n"
- "routes are still present \n Error: {}".format(
+ assert (
+ result4 is not True
+ ), "Testcase {} : Failed \n" "routes are still present \n Error: {}".format(
tc_name, result4
- ))
+ )
step("vm4 should be present in FRR1")
dut = "r1"
result4 = verify_rib(
tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
)
- assert result4 is not True, ("Testcase {} : Failed \n"
- "routes are still present \n Error: {}".format(
+ assert (
+ result4 is not True
+ ), "Testcase {} : Failed \n" "routes are still present \n Error: {}".format(
tc_name, result4
- ))
+ )
step("vm4 should be present in FRR1")
dut = "r1"