actual = (
net["r%s" % i]
.cmd(
- 'vtysh -c "show ip route" | sed -e \'/^Codes: /,/^\s*$/d\' | env LC_ALL=en_US.UTF-8 sort 2> /dev/null'
+ "vtysh -c \"show ip route\" | sed -e '/^Codes: /,/^\s*$/d' | env LC_ALL=en_US.UTF-8 sort 2> /dev/null"
)
.rstrip()
)
actual = (
net["r%s" % i]
.cmd(
- 'vtysh -c "show ipv6 route" | sed -e \'/^Codes: /,/^\s*$/d\' | env LC_ALL=en_US.UTF-8 sort 2> /dev/null'
+ "vtysh -c \"show ipv6 route\" | sed -e '/^Codes: /,/^\s*$/d' | env LC_ALL=en_US.UTF-8 sort 2> /dev/null"
)
.rstrip()
)
switch.add_link(tgen.gears["r2"])
switch.add_link(tgen.gears["r3"])
+
def setup_module(mod):
"Sets up the pytest environment"
tgen = Topogen(BFDTopo, mod.__name__)
pytestmark = [pytest.mark.bfdd, pytest.mark.isisd]
+
class TemplateTopo(Topo):
"Test topology builder"
pytestmark = [pytest.mark.bfdd, pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
write_test_footer(tc_name)
+
@pytest.mark.parametrize("ecmp_num", ["8", "16", "32"])
@pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):
addr_type,
dut,
input_dict_1,
- next_hop=NEXT_HOPS[addr_type][:int(ecmp_num)],
+ next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
protocol=protocol,
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
addr_type,
dut,
input_dict_1,
- next_hop=NEXT_HOPS[addr_type][:int(ecmp_num)],
+ next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
protocol=protocol,
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
write_test_footer(tc_name)
+
@pytest.mark.parametrize("ecmp_num", ["8", "16", "32"])
@pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):
addr_type,
dut,
input_dict_1,
- next_hop=NEXT_HOPS[addr_type][:int(ecmp_num)],
+ next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
protocol=protocol,
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
addr_type,
dut,
input_dict_1,
- next_hop=NEXT_HOPS[addr_type][:int(ecmp_num)],
+ next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
protocol=protocol,
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
local_host.run(cmd_str)
remote_host.run(cmd_str)
+
def check_mac(dut, vni, mac, m_type, esi, intf, ping_gw=False, tgen=None):
"""
checks if mac is present and if desination matches the one provided
pytestmark = [pytest.mark.bgpd, pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
break
count += 1
sleep(1)
- #tgen.mininet_cli()
+ # tgen.mininet_cli()
assertmsg = "BGP Peer 10.4.4.4 did not connect"
assert passed, assertmsg
associated_int = r1_snmp.get(
"mplsL3VpnVrfAssociatedInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
)
- assertmsg = "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
- associated_int
+ assertmsg = (
+ "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
+ associated_int
+ )
)
assert associated_int == "3", assertmsg
"unknown(0)",
"ipv4(1)",
"unknown(0)",
- ],
+ ],
"mplsL3VpnVrfRteInetCidrNextHop": [
"C0 A8 64 0A",
"C0 A8 C8 0A",
"bgp(14)",
"local(2)",
],
- "mplsL3VpnVrfRteInetCidrNextHopAS": ["65001", "65001", "0", "65001", "0", "65001", "0"],
+ "mplsL3VpnVrfRteInetCidrNextHopAS": [
+ "65001",
+ "65001",
+ "0",
+ "65001",
+ "0",
+ "65001",
+ "0",
+ ],
"mplsL3VpnVrfRteInetCidrMetric1": ["0", "0", "20", "0", "0", "0", "0"],
"mplsL3VpnVrfRteInetCidrMetric2": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
"mplsL3VpnVrfRteInetCidrMetric3": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
"active(1)",
"active(1)",
"active(1)",
- "active(1)",
+ "active(1)",
],
}
fifo = sys.argv[1]
while True:
- pipe = open(fifo, 'r')
+ pipe = open(fifo, "r")
with pipe:
line = pipe.readline().strip()
if line != "":
fifo = sys.argv[1]
while True:
- pipe = open(fifo, 'r')
+ pipe = open(fifo, "r")
with pipe:
line = pipe.readline().strip()
if line != "":
fifo = sys.argv[1]
while True:
- pipe = open(fifo, 'r')
+ pipe = open(fifo, "r")
with pipe:
line = pipe.readline().strip()
if line != "":
fifo = sys.argv[1]
while True:
- pipe = open(fifo, 'r')
+ pipe = open(fifo, "r")
with pipe:
line = pipe.readline().strip()
if line != "":
pytestmark = [pytest.mark.bgpd, pytest.mark.ospfd]
+
def test_adjacencies():
CliOnFail = None
# For debugging, uncomment the next line
pytestmark = [pytest.mark.bgpd, pytest.mark.ospfd]
+
def test_check_linux_vrf():
CliOnFail = None
# For debugging, uncomment the next line
"""
Waits for the BGP connection between a given router and a given peer
(specified by its IP address) to be established. If the connection is
- not established within a given timeout, then an exception is raised.
+ not established within a given timeout, then an exception is raised.
"""
tgen = get_topogen()
router = tgen.routers()[router_name]
# Required to instantiate the topology builder class.
from mininet.topo import Topo
-#Basic scenario for BGP-LU. Nodes are directly connected.
-#Node 3 is advertising many routes to 2, which advertises them
-#as BGP-LU to 1; this way we get routes with actual labels, as
-#opposed to implicit-null routes in the 2-node case.
+# Basic scenario for BGP-LU. Nodes are directly connected.
+# Node 3 is advertising many routes to 2, which advertises them
+# as BGP-LU to 1; this way we get routes with actual labels, as
+# opposed to implicit-null routes in the 2-node case.
#
# AS1 BGP-LU AS2 iBGP AS2
-#+-----+ +-----+ +-----+
-#| |.1 .2| |.2 .3| |
-#| 1 +----------------+ 2 +-----------------+ 3 |
-#| | 10.0.0.0/24 | | 10.0.1.0/24 | |
-#+-----+ +-----+ +-----+
+# +-----+ +-----+ +-----+
+# | |.1 .2| |.2 .3| |
+# | 1 +----------------+ 2 +-----------------+ 3 |
+# | | 10.0.0.0/24 | | 10.0.1.0/24 | |
+# +-----+ +-----+ +-----+
+
class TemplateTopo(Topo):
"Test topology builder"
switch.add_link(tgen.gears["R3"])
-
def setup_module(mod):
"Sets up the pytest environment"
# This function initiates the topology build with Topogen...
# This function tears down the whole topology.
tgen.stop_topology()
+
def check_labelpool(router):
json_file = "{}/{}/labelpool.summ.json".format(CWD, router.name)
expected = json.loads(open(json_file).read())
- test_func = partial(topotest.router_json_cmp, router, "show bgp labelpool summary json", expected)
+ test_func = partial(
+ topotest.router_json_cmp, router, "show bgp labelpool summary json", expected
+ )
_, result = topotest.run_and_expect(test_func, None, count=20, wait=1)
assertmsg = '"{}" JSON output mismatches - Did not converge'.format(router.name)
assert result is None, assertmsg
-
+
+
def test_converge_bgplu():
"Wait for protocol convergence"
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- #tgen.mininet_cli();
+ # tgen.mininet_cli();
r1 = tgen.gears["R1"]
r2 = tgen.gears["R2"]
check_labelpool(r1)
check_labelpool(r2)
+
def test_clear_bgplu():
"Wait for protocol convergence"
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- #tgen.mininet_cli();
+ # tgen.mininet_cli();
r1 = tgen.gears["R1"]
r2 = tgen.gears["R2"]
check_labelpool(r1)
check_labelpool(r2)
+
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()
pytestmark = [pytest.mark.bgpd, pytest.mark.ospfd]
+
def test_add_routes():
CliOnFail = None
# For debugging, uncomment the next line
from mininet.topo import Topo
-#TODO: select markers based on daemons used during test
+# TODO: select markers based on daemons used during test
# pytest module level markers
"""
pytestmark = pytest.mark.bfdd # single marker
from lib.topojson import build_topo_from_json, build_config_from_json
-#TODO: select markers based on daemons used during test
+# TODO: select markers based on daemons used during test
# pytest module level markers
"""
pytestmark = pytest.mark.bfdd # single marker
from lib.topojson import build_topo_from_json, build_config_from_json
-#TODO: select markers based on daemons used during test
+# TODO: select markers based on daemons used during test
# pytest module level markers
"""
pytestmark = pytest.mark.bfdd # single marker
from lib.topojson import build_topo_from_json, build_config_from_json
-#TODO: select markers based on daemons used during test
+# TODO: select markers based on daemons used during test
# pytest module level markers
"""
pytestmark = pytest.mark.bfdd # single marker
f_in.close()
f_out.close()
+
def setup_module(mod):
"Sets up the pytest environment"
tgen = Topogen(TemplateTopo, mod.__name__)
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, "../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
# Global multi-dimensional dictionary containing all expected outputs
outputs = {}
+
class TemplateTopo(Topo):
"Test topology builder"
+
def build(self, *_args, **_opts):
"Build function"
tgen = get_topogen(self)
#
# Define FRR Routers
#
- for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
+ for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
tgen.add_router(router)
#
# Define connections
#
- switch = tgen.add_switch('s1')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-sw1")
- switch.add_link(tgen.gears['rt2'], nodeif="eth-sw1")
- switch.add_link(tgen.gears['rt3'], nodeif="eth-sw1")
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1")
- switch = tgen.add_switch('s2')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4")
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2")
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2")
- switch = tgen.add_switch('s4')
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3")
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3")
- switch = tgen.add_switch('s6')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt5")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt4")
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4")
- switch = tgen.add_switch('s7')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt4")
+ switch = tgen.add_switch("s7")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4")
- switch = tgen.add_switch('s8')
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt5")
+ switch = tgen.add_switch("s8")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5")
def setup_module(mod):
# For all registered routers, load the zebra configuration file
for rname, router in router_list.items():
router.load_config(
- TopoRouter.RD_ZEBRA,
- os.path.join(CWD, '{}/zebra.conf'.format(rname))
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_ISIS,
- os.path.join(CWD, '{}/isisd.conf'.format(rname))
+ TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
)
tgen.start_router()
+
def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
# This function tears down the whole topology.
tgen.stop_topology()
+
def router_compare_json_output(rname, command, reference):
"Compare router JSON output"
assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
assert diff is None, assertmsg
+
#
# Step 1
#
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
router_compare_json_output(
- rname,
+ rname,
"show yang operational-data /frr-interface:lib isisd",
"step1/show_yang_interface_isis_adjacencies.ref",
)
+
def test_rib_ipv4_step1():
logger.info("Test (step 1): verify IPv4 RIB")
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
router_compare_json_output(
rname, "show ip route isis json", "step1/show_ip_route.ref"
)
+
def test_rib_ipv6_step1():
logger.info("Test (step 1): verify IPv6 RIB")
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']:
+ for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
router_compare_json_output(
rname, "show ipv6 route isis json", "step1/show_ipv6_route.ref"
)
+
#
# Step 2
#
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Disabling setting the attached-bit on RT2 and RT4')
- tgen.net['rt2'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no attached-bit send"')
- tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no attached-bit send"')
+ logger.info("Disabling setting the attached-bit on RT2 and RT4")
+ tgen.net["rt2"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no attached-bit send"'
+ )
+ tgen.net["rt4"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no attached-bit send"'
+ )
- for rname in ['rt1', 'rt6']:
+ for rname in ["rt1", "rt6"]:
router_compare_json_output(
rname, "show ip route isis json", "step2/show_ip_route.ref"
)
+
def test_rib_ipv6_step2():
logger.info("Test (step 2): verify IPv6 RIB")
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt6']:
+ for rname in ["rt1", "rt6"]:
router_compare_json_output(
rname, "show ipv6 route isis json", "step2/show_ipv6_route.ref"
)
+
#
# Step 3
#
# -disble processing a LSP with attach bit set
#
# Expected changes:
-# -RT1 and RT6 should not install a default route
+# -RT1 and RT6 should not install a default route
#
def test_rib_ipv4_step3():
logger.info("Test (step 3): verify IPv4 RIB")
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('Enable setting the attached-bit on RT2 and RT4')
- tgen.net['rt2'].cmd('vtysh -c "conf t" -c "router isis 1" -c "attached-bit send"')
- tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "attached-bit send"')
+ logger.info("Enable setting the attached-bit on RT2 and RT4")
+ tgen.net["rt2"].cmd('vtysh -c "conf t" -c "router isis 1" -c "attached-bit send"')
+ tgen.net["rt4"].cmd('vtysh -c "conf t" -c "router isis 1" -c "attached-bit send"')
- logger.info('Disable processing received attached-bit in LSP on RT1 and RT6')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "attached-bit receive ignore"')
- tgen.net['rt6'].cmd('vtysh -c "conf t" -c "router isis 1" -c "attached-bit receive ignore"')
+ logger.info("Disable processing received attached-bit in LSP on RT1 and RT6")
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "attached-bit receive ignore"'
+ )
+ tgen.net["rt6"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "attached-bit receive ignore"'
+ )
- for rname in ['rt1', 'rt6']:
+ for rname in ["rt1", "rt6"]:
router_compare_json_output(
rname, "show ip route isis json", "step3/show_ip_route.ref"
)
+
def test_rib_ipv6_step3():
logger.info("Test (step 3): verify IPv6 RIB")
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt6']:
+ for rname in ["rt1", "rt6"]:
router_compare_json_output(
rname, "show ipv6 route isis json", "step3/show_ipv6_route.ref"
)
+
#
# Step 4
#
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- logger.info('restore default processing on received attached-bit in LSP on RT1 and RT6')
- tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no attached-bit receive ignore"')
- tgen.net['rt6'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no attached-bit receive ignore"')
-
- for rname in ['rt1', 'rt6']:
+ logger.info(
+ "restore default processing on received attached-bit in LSP on RT1 and RT6"
+ )
+ tgen.net["rt1"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no attached-bit receive ignore"'
+ )
+ tgen.net["rt6"].cmd(
+ 'vtysh -c "conf t" -c "router isis 1" -c "no attached-bit receive ignore"'
+ )
+
+ for rname in ["rt1", "rt6"]:
router_compare_json_output(
- rname, "show ip route isis json", "step4/show_ip_route.ref")
+ rname, "show ip route isis json", "step4/show_ip_route.ref"
+ )
+
def test_rib_ipv6_step4():
logger.info("Test (step 4): verify IPv6 RIB")
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ['rt1', 'rt6']:
+ for rname in ["rt1", "rt6"]:
router_compare_json_output(
- rname, "show ipv6 route isis json", "step4/show_ipv6_route.ref")
+ rname, "show ipv6 route isis json", "step4/show_ipv6_route.ref"
+ )
+
# Memory leak test template
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()
if not tgen.is_memleak_enabled():
- pytest.skip('Memory leak test/report is disabled')
+ pytest.skip("Memory leak test/report is disabled")
tgen.report_memory_leaks()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
f_in.close()
f_out.close()
+
def setup_module(mod):
"Sets up the pytest environment"
tgen = Topogen(TemplateTopo, mod.__name__)
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, "../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
class TemplateTopo(Topo):
"Test topology builder"
+
def build(self, *_args, **_opts):
"Build function"
tgen = get_topogen(self)
#
# Define FRR Routers
#
- for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'dst']:
+ for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "dst"]:
tgen.add_router(router)
#
# Define connections
#
- switch = tgen.add_switch('s1')
- switch.add_link(tgen.gears['rt1'], nodeif="eth-sw1")
- switch.add_link(tgen.gears['rt2'], nodeif="eth-sw1")
- switch.add_link(tgen.gears['rt3'], nodeif="eth-sw1")
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1")
+
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-1")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-1")
- switch = tgen.add_switch('s2')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-1")
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-1")
+ switch = tgen.add_switch("s3")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-2")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-2")
- switch = tgen.add_switch('s3')
- switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-2")
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-2")
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-1")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-1")
- switch = tgen.add_switch('s4')
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-1")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-1")
+ switch = tgen.add_switch("s5")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-2")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-2")
- switch = tgen.add_switch('s5')
- switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-2")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-2")
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4")
- switch = tgen.add_switch('s6')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt5")
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt4")
+ switch = tgen.add_switch("s7")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4")
- switch = tgen.add_switch('s7')
- switch.add_link(tgen.gears['rt4'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt4")
+ switch = tgen.add_switch("s8")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5")
- switch = tgen.add_switch('s8')
- switch.add_link(tgen.gears['rt5'], nodeif="eth-rt6")
- switch.add_link(tgen.gears['rt6'], nodeif="eth-rt5")
+ switch = tgen.add_switch("s9")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-dst")
+ switch.add_link(tgen.gears["dst"], nodeif="eth-rt6")
- switch = tgen.add_switch('s9')
- switch.add_link(tgen.gears['rt6'], nodeif="eth-dst")
- switch.add_link(tgen.gears['dst'], nodeif="eth-rt6")
def setup_module(mod):
"Sets up the pytest environment"
# For all registered routers, load the zebra configuration file
for rname, router in router_list.iteritems():
router.load_config(
- TopoRouter.RD_ZEBRA,
- os.path.join(CWD, '{}/zebra.conf'.format(rname))
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_ISIS,
- os.path.join(CWD, '{}/isisd.conf'.format(rname))
+ TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_PATH,
- os.path.join(CWD, '{}/pathd.conf'.format(rname))
+ TopoRouter.RD_PATH, os.path.join(CWD, "{}/pathd.conf".format(rname))
)
router.load_config(
- TopoRouter.RD_BGP,
- os.path.join(CWD, '{}/bgpd.conf'.format(rname))
+ TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format(rname))
)
tgen.start_router()
+
def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
# This function tears down the whole topology.
tgen.stop_topology()
+
def setup_testcase(msg):
logger.info(msg)
tgen = get_topogen()
return tgen
+
def print_cmd_result(rname, command):
print(get_topogen().gears[rname].vtysh_cmd(command, isjson=False))
+
def compare_json_test(router, command, reference, exact):
output = router.vtysh_cmd(command, isjson=True)
result = topotest.json_cmp(output, reference)
# Note: topotest.json_cmp() just checks on inclusion of keys.
# For exact matching also compare the other way around.
if not result and exact:
- return topotest.json_cmp(reference, output)
+ return topotest.json_cmp(reference, output)
else:
- return result
+ return result
+
def cmp_json_output(rname, command, reference, exact=False):
"Compare router JSON output"
logger.info('Comparing router "%s" "%s" output', rname, command)
tgen = get_topogen()
- filename = '{}/{}/{}'.format(CWD, rname, reference)
+ filename = "{}/{}/{}".format(CWD, rname, reference)
expected = json.loads(open(filename).read())
# Run test function until we get an result. Wait at most 60 seconds.
- test_func = partial(compare_json_test,
- tgen.gears[rname], command, expected, exact)
+ test_func = partial(compare_json_test, tgen.gears[rname], command, expected, exact)
_, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5)
assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
assert diff is None, assertmsg
+
def cmp_json_output_exact(rname, command, reference):
return cmp_json_output(rname, command, reference, True)
-def add_candidate_path(rname, endpoint, pref, name, segment_list='default'):
- get_topogen().net[rname].cmd(''' \
+
+def add_candidate_path(rname, endpoint, pref, name, segment_list="default"):
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "segment-routing" \
-c "traffic-eng" \
- -c "policy color 1 endpoint ''' + endpoint + '''" \
- -c "candidate-path preference ''' + str(pref) + ''' name ''' + name + ''' explicit segment-list ''' + segment_list + '''"''')
+ -c "policy color 1 endpoint """
+ + endpoint
+ + """" \
+ -c "candidate-path preference """
+ + str(pref)
+ + """ name """
+ + name
+ + """ explicit segment-list """
+ + segment_list
+ + '''"'''
+ )
+
def delete_candidate_path(rname, endpoint, pref):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "segment-routing" \
-c "traffic-eng" \
- -c "policy color 1 endpoint ''' + endpoint + '''" \
- -c "no candidate-path preference ''' + str(pref) + '''"''')
+ -c "policy color 1 endpoint """
+ + endpoint
+ + """" \
+ -c "no candidate-path preference """
+ + str(pref)
+ + '''"'''
+ )
+
def add_segment(rname, name, index, label):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "segment-routing" \
-c "traffic-eng" \
- -c "segment-list ''' + name + '''" \
- -c "index ''' + str(index) + ''' mpls label ''' + str(label) + '''"''')
+ -c "segment-list """
+ + name
+ + """" \
+ -c "index """
+ + str(index)
+ + """ mpls label """
+ + str(label)
+ + '''"'''
+ )
+
def delete_segment(rname, name, index):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "segment-routing" \
-c "traffic-eng" \
- -c "segment-list ''' + name + '''" \
- -c "no index ''' + str(index) + '''"''')
+ -c "segment-list """
+ + name
+ + """" \
+ -c "no index """
+ + str(index)
+ + '''"'''
+ )
+
def create_sr_policy(rname, endpoint, bsid):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "segment-routing" \
-c "traffic-eng" \
- -c "policy color 1 endpoint ''' + endpoint + '''" \
+ -c "policy color 1 endpoint """
+ + endpoint
+ + """" \
-c "name default" \
- -c "binding-sid ''' + str(bsid) + '''"''')
+ -c "binding-sid """
+ + str(bsid)
+ + '''"'''
+ )
+
def delete_sr_policy(rname, endpoint):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "segment-routing" \
-c "traffic-eng" \
- -c "no policy color 1 endpoint ''' + endpoint + '''"''')
+ -c "no policy color 1 endpoint """
+ + endpoint
+ + '''"'''
+ )
+
def create_prefix_sid(rname, prefix, sid):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ """ \
vtysh -c "conf t" \
-c "router isis 1" \
- -c "segment-routing prefix ''' + prefix + " index " + str(sid) + '''"''')
+ -c "segment-routing prefix """
+ + prefix
+ + " index "
+ + str(sid)
+ + '''"'''
+ )
+
def delete_prefix_sid(rname, prefix):
- get_topogen().net[rname].cmd(''' \
+ get_topogen().net[rname].cmd(
+ ''' \
vtysh -c "conf t" \
-c "router isis 1" \
- -c "no segment-routing prefix "''' + prefix)
+ -c "no segment-routing prefix "'''
+ + prefix
+ )
+
#
# Step 1
def test_srte_init_step1():
setup_testcase("Test (step 1): wait for IS-IS convergence / label distribution")
- for rname in ['rt1', 'rt6']:
- cmp_json_output(rname,
- "show mpls table json",
- "step1/show_mpls_table_without_candidate.ref")
+ for rname in ["rt1", "rt6"]:
+ cmp_json_output(
+ rname, "show mpls table json", "step1/show_mpls_table_without_candidate.ref"
+ )
+
def test_srte_add_candidate_check_mpls_table_step1():
setup_testcase("Test (step 1): check MPLS table regarding the added Candidate Path")
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- add_candidate_path(rname, endpoint, 100, 'default')
- cmp_json_output(rname,
- "show mpls table json",
- "step1/show_mpls_table_with_candidate.ref")
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ add_candidate_path(rname, endpoint, 100, "default")
+ cmp_json_output(
+ rname, "show mpls table json", "step1/show_mpls_table_with_candidate.ref"
+ )
delete_candidate_path(rname, endpoint, 100)
+
def test_srte_reinstall_sr_policy_check_mpls_table_step1():
- setup_testcase("Test (step 1): check MPLS table after the SR Policy was removed and reinstalled")
+ setup_testcase(
+ "Test (step 1): check MPLS table after the SR Policy was removed and reinstalled"
+ )
- for rname, endpoint, bsid in [('rt1', '6.6.6.6', 1111), ('rt6', '1.1.1.1', 6666)]:
- add_candidate_path(rname, endpoint, 100, 'default')
+ for rname, endpoint, bsid in [("rt1", "6.6.6.6", 1111), ("rt6", "1.1.1.1", 6666)]:
+ add_candidate_path(rname, endpoint, 100, "default")
delete_sr_policy(rname, endpoint)
- cmp_json_output(rname,
- "show mpls table json",
- "step1/show_mpls_table_without_candidate.ref")
+ cmp_json_output(
+ rname, "show mpls table json", "step1/show_mpls_table_without_candidate.ref"
+ )
create_sr_policy(rname, endpoint, bsid)
- add_candidate_path(rname, endpoint, 100, 'default')
- cmp_json_output(rname,
- "show mpls table json",
- "step1/show_mpls_table_with_candidate.ref")
+ add_candidate_path(rname, endpoint, 100, "default")
+ cmp_json_output(
+ rname, "show mpls table json", "step1/show_mpls_table_with_candidate.ref"
+ )
delete_candidate_path(rname, endpoint, 100)
+
#
# Step 2
#
def test_srte_bare_policy_step2():
setup_testcase("Test (step 2): bare SR Policy should not be operational")
- for rname in ['rt1', 'rt6']:
- cmp_json_output_exact(rname,
- "show yang operational-data /frr-pathd:pathd pathd",
- "step2/show_operational_data.ref")
+ for rname in ["rt1", "rt6"]:
+ cmp_json_output_exact(
+ rname,
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step2/show_operational_data.ref",
+ )
+
def test_srte_add_candidate_check_operational_data_step2():
- setup_testcase("Test (step 2): add single Candidate Path, SR Policy should be operational")
+ setup_testcase(
+ "Test (step 2): add single Candidate Path, SR Policy should be operational"
+ )
+
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ add_candidate_path(rname, endpoint, 100, "default")
+ cmp_json_output(
+ rname,
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step2/show_operational_data_with_candidate.ref",
+ )
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- add_candidate_path(rname, endpoint, 100, 'default')
- cmp_json_output(rname,
- "show yang operational-data /frr-pathd:pathd pathd",
- "step2/show_operational_data_with_candidate.ref")
def test_srte_config_remove_candidate_check_operational_data_step2():
- setup_testcase("Test (step 2): remove single Candidate Path, SR Policy should not be operational anymore")
+ setup_testcase(
+ "Test (step 2): remove single Candidate Path, SR Policy should not be operational anymore"
+ )
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
delete_candidate_path(rname, endpoint, 100)
- cmp_json_output_exact(rname,
- "show yang operational-data /frr-pathd:pathd pathd",
- "step2/show_operational_data.ref")
+ cmp_json_output_exact(
+ rname,
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step2/show_operational_data.ref",
+ )
+
#
# Step 3
def test_srte_add_two_candidates_step3():
setup_testcase("Test (step 3): second Candidate Path has higher Priority")
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- for pref, cand_name in [('100', 'first'), ('200', 'second')]:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ for pref, cand_name in [("100", "first"), ("200", "second")]:
add_candidate_path(rname, endpoint, pref, cand_name)
- cmp_json_output(rname,
- "show yang operational-data /frr-pathd:pathd pathd",
- "step3/show_operational_data_with_two_candidates.ref")
+ cmp_json_output(
+ rname,
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step3/show_operational_data_with_two_candidates.ref",
+ )
# cleanup
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- for pref in ['100', '200']:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ for pref in ["100", "200"]:
delete_candidate_path(rname, endpoint, pref)
+
def test_srte_add_two_candidates_with_reverse_priority_step3():
setup_testcase("Test (step 3): second Candidate Path has lower Priority")
# Use reversed priorities here
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- for pref, cand_name in [('200', 'first'), ('100', 'second')]:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ for pref, cand_name in [("200", "first"), ("100", "second")]:
add_candidate_path(rname, endpoint, pref, cand_name)
- cmp_json_output(rname,
- "show yang operational-data /frr-pathd:pathd pathd",
- "step3/show_operational_data_with_two_candidates.ref")
+ cmp_json_output(
+ rname,
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step3/show_operational_data_with_two_candidates.ref",
+ )
# cleanup
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- for pref in ['100', '200']:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ for pref in ["100", "200"]:
delete_candidate_path(rname, endpoint, pref)
+
def test_srte_remove_best_candidate_step3():
setup_testcase("Test (step 3): delete the Candidate Path with higher priority")
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- for pref, cand_name in [('100', 'first'), ('200', 'second')]:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ for pref, cand_name in [("100", "first"), ("200", "second")]:
add_candidate_path(rname, endpoint, pref, cand_name)
# Delete candidate with higher priority
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
delete_candidate_path(rname, endpoint, 200)
# Candidate with lower priority should get active now
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- cmp_json_output(rname,
- "show yang operational-data /frr-pathd:pathd pathd",
- "step3/show_operational_data_with_single_candidate.ref")
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ cmp_json_output(
+ rname,
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step3/show_operational_data_with_single_candidate.ref",
+ )
# cleanup
delete_candidate_path(rname, endpoint, 100)
+
#
# Step 4
#
def test_srte_change_segment_list_check_mpls_table_step4():
setup_testcase("Test (step 4): check MPLS table for changed Segment List")
- for rname, endpoint in [('rt1', '6.6.6.6'), ('rt6', '1.1.1.1')]:
- add_candidate_path(rname, endpoint, 100, 'default')
- # now change the segment list name
- add_candidate_path(rname, endpoint, 100, 'default', 'test')
- cmp_json_output(rname,
- "show mpls table json",
- "step4/show_mpls_table.ref")
+ for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
+ add_candidate_path(rname, endpoint, 100, "default")
+ # now change the segment list name
+ add_candidate_path(rname, endpoint, 100, "default", "test")
+ cmp_json_output(rname, "show mpls table json", "step4/show_mpls_table.ref")
delete_candidate_path(rname, endpoint, 100)
+
def test_srte_segment_list_add_segment_check_mpls_table_step4():
- setup_testcase("Test (step 4): check MPLS table for added (then changed and finally deleted) segment")
+ setup_testcase(
+ "Test (step 4): check MPLS table for added (then changed and finally deleted) segment"
+ )
- add_candidate_path('rt1', '6.6.6.6', 100, 'default', 'test')
+ add_candidate_path("rt1", "6.6.6.6", 100, "default", "test")
# first add a new segment
- add_segment('rt1', 'test', 25, 16050)
- cmp_json_output('rt1',
- "show mpls table json",
- "step4/show_mpls_table_add_segment.ref")
+ add_segment("rt1", "test", 25, 16050)
+ cmp_json_output(
+ "rt1", "show mpls table json", "step4/show_mpls_table_add_segment.ref"
+ )
# ... then change it ...
- add_segment('rt1', 'test', 25, 16030)
- cmp_json_output('rt1',
- "show mpls table json",
- "step4/show_mpls_table_change_segment.ref")
+ add_segment("rt1", "test", 25, 16030)
+ cmp_json_output(
+ "rt1", "show mpls table json", "step4/show_mpls_table_change_segment.ref"
+ )
# ... and finally delete it
- delete_segment('rt1', 'test', 25)
- cmp_json_output('rt1',
- "show mpls table json",
- "step4/show_mpls_table.ref")
- delete_candidate_path('rt1', '6.6.6.6', 100)
+ delete_segment("rt1", "test", 25)
+ cmp_json_output("rt1", "show mpls table json", "step4/show_mpls_table.ref")
+ delete_candidate_path("rt1", "6.6.6.6", 100)
+
#
# Step 5
# Checking the nexthop using a single SR Policy and a Candidate Path with configured route-map
#
def test_srte_route_map_with_sr_policy_check_nextop_step5():
- setup_testcase("Test (step 5): recursive nexthop learned through BGP neighbour should be aligned with SR Policy from route-map")
+ setup_testcase(
+ "Test (step 5): recursive nexthop learned through BGP neighbour should be aligned with SR Policy from route-map"
+ )
# (re-)build the SR Policy two times to ensure that reinstalling still works
- for i in [1,2]:
- cmp_json_output('rt1',
- "show ip route bgp json",
- "step5/show_ip_route_bgp_inactive_srte.ref")
+ for i in [1, 2]:
+ cmp_json_output(
+ "rt1", "show ip route bgp json", "step5/show_ip_route_bgp_inactive_srte.ref"
+ )
- delete_sr_policy('rt1', '6.6.6.6')
- cmp_json_output('rt1',
- "show ip route bgp json",
- "step5/show_ip_route_bgp_inactive_srte.ref")
+ delete_sr_policy("rt1", "6.6.6.6")
+ cmp_json_output(
+ "rt1", "show ip route bgp json", "step5/show_ip_route_bgp_inactive_srte.ref"
+ )
- create_sr_policy('rt1', '6.6.6.6', 1111)
- cmp_json_output('rt1',
- "show ip route bgp json",
- "step5/show_ip_route_bgp_inactive_srte.ref")
+ create_sr_policy("rt1", "6.6.6.6", 1111)
+ cmp_json_output(
+ "rt1", "show ip route bgp json", "step5/show_ip_route_bgp_inactive_srte.ref"
+ )
+
+ add_candidate_path("rt1", "6.6.6.6", 100, "default")
+ cmp_json_output(
+ "rt1", "show ip route bgp json", "step5/show_ip_route_bgp_active_srte.ref"
+ )
- add_candidate_path('rt1', '6.6.6.6', 100, 'default')
- cmp_json_output('rt1',
- "show ip route bgp json",
- "step5/show_ip_route_bgp_active_srte.ref")
+ delete_candidate_path("rt1", "6.6.6.6", 100)
- delete_candidate_path('rt1', '6.6.6.6', 100)
def test_srte_route_map_with_sr_policy_reinstall_prefix_sid_check_nextop_step5():
- setup_testcase("Test (step 5): remove and re-install prefix SID on fist path element and check SR Policy activity")
+ setup_testcase(
+ "Test (step 5): remove and re-install prefix SID on fist path element and check SR Policy activity"
+ )
# first add a candidate path so the SR Policy is active
- add_candidate_path('rt1', '6.6.6.6', 100, 'default')
- cmp_json_output('rt1',
- "show yang operational-data /frr-pathd:pathd pathd",
- "step5/show_operational_data_active.ref")
+ add_candidate_path("rt1", "6.6.6.6", 100, "default")
+ cmp_json_output(
+ "rt1",
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step5/show_operational_data_active.ref",
+ )
# delete prefix SID from first element of the configured path and check
# if the SR Policy is inactive since the label can't be resolved anymore
- delete_prefix_sid('rt5', "5.5.5.5/32")
- cmp_json_output('rt1',
- "show yang operational-data /frr-pathd:pathd pathd",
- "step5/show_operational_data_inactive.ref")
- cmp_json_output('rt1',
- "show ip route bgp json",
- "step5/show_ip_route_bgp_inactive_srte.ref")
+ delete_prefix_sid("rt5", "5.5.5.5/32")
+ cmp_json_output(
+ "rt1",
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step5/show_operational_data_inactive.ref",
+ )
+ cmp_json_output(
+ "rt1", "show ip route bgp json", "step5/show_ip_route_bgp_inactive_srte.ref"
+ )
# re-create the prefix SID and check if the SR Policy is active
- create_prefix_sid('rt5', "5.5.5.5/32", 50)
- cmp_json_output('rt1',
- "show yang operational-data /frr-pathd:pathd pathd",
- "step5/show_operational_data_active.ref")
- cmp_json_output('rt1',
- "show ip route bgp json",
- "step5/show_ip_route_bgp_active_srte.ref")
+ create_prefix_sid("rt5", "5.5.5.5/32", 50)
+ cmp_json_output(
+ "rt1",
+ "show yang operational-data /frr-pathd:pathd pathd",
+ "step5/show_operational_data_active.ref",
+ )
+ cmp_json_output(
+ "rt1", "show ip route bgp json", "step5/show_ip_route_bgp_active_srte.ref"
+ )
+
# Memory leak test template
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()
if not tgen.is_memleak_enabled():
- pytest.skip('Memory leak test/report is disabled')
+ pytest.skip("Memory leak test/report is disabled")
tgen.report_memory_leaks()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
pytestmark = [pytest.mark.isisd]
+
class TemplateTopo(Topo):
"Test topology builder"
f_in.close()
f_out.close()
+
def setup_module(mod):
"Sets up the pytest environment"
tgen = Topogen(TemplateTopo, mod.__name__)
pytestmark = [pytest.mark.isisd]
+
class ISISTopo1(Topo):
"Simple two layer ISIS vrf topology"
pytestmark = [pytest.mark.isisd]
+
class ISISTopo1(Topo):
"Simple two layer ISIS topology"
pytestmark = [pytest.mark.ldpd, pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
pytestmark = [pytest.mark.ldpd, pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
pytestmark = [pytest.mark.ldpd, pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
pytestmark = [pytest.mark.ldpd, pytest.mark.ospfd]
+
class TemplateTopo(Topo):
"Test topology builder"
@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
-def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None,
-aspath=None, multi_nh=None):
+def verify_bgp_rib(
+ tgen, addr_type, dut, input_dict, next_hop=None, aspath=None, multi_nh=None
+):
"""
This API is to verify whether bgp rib has any
matching route for a nexthop.
if not isinstance(next_hop, list):
next_hop = [next_hop]
list1 = next_hop
- found_hops = [rib_r["ip"] for rib_r in
- rib_routes_json["routes"][
- st_rt][0]["nexthops"]]
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
list2 = found_hops
- missing_list_of_nexthops = \
- set(list2).difference(list1)
- additional_nexthops_in_required_nhs = \
- set(list1).difference(list2)
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
if list2:
if additional_nexthops_in_required_nhs:
- logger.info("Missing nexthop %s for route"\
- " %s in RIB of router %s\n", \
- additional_nexthops_in_required_nhs, \
- st_rt, dut)
- errormsg=("Nexthop {} is Missing for "\
- "route {} in RIB of router {}\n".format(
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut))
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
return result
-def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True, ospf="ospf"):
+def __create_ospf_global(
+ tgen, input_dict, router, build=False, load_config=True, ospf="ospf"
+):
"""
Helper API to create ospf global configuration.
logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
rnode = tgen.routers()[dut]
- show_ip_pim_interface_json = rnode.\
- vtysh_cmd("show ip pim interface json", isjson=True)
+ show_ip_pim_interface_json = rnode.vtysh_cmd(
+ "show ip pim interface json", isjson=True
+ )
- logger.info("show_ip_pim_interface_json: \n %s",
- show_ip_pim_interface_json)
+ logger.info("show_ip_pim_interface_json: \n %s", show_ip_pim_interface_json)
if interface_ip:
if interface in show_ip_pim_interface_json:
pim_intf_json = show_ip_pim_interface_json[interface]
if pim_intf_json["address"] != interface_ip:
- errormsg = ("[DUT %s]: PIM interface "
- "ip is not correct "
- "[FAILED]!! Expected : %s, Found : %s"
- %(dut, pim_intf_json["address"],interface_ip))
+ errormsg = (
+ "[DUT %s]: PIM interface "
+ "ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, pim_intf_json["address"], interface_ip)
+ )
return errormsg
else:
- logger.info("[DUT %s]: PIM interface "
- "ip is correct "
- "[Passed]!! Expected : %s, Found : %s"
- %(dut, pim_intf_json["address"],interface_ip))
+ logger.info(
+ "[DUT %s]: PIM interface "
+ "ip is correct "
+ "[Passed]!! Expected : %s, Found : %s"
+ % (dut, pim_intf_json["address"], interface_ip)
+ )
return True
else:
for destLink, data in topo["routers"][dut]["links"].items():
pim_intf_ip = data["ipv4"].split("/")[0]
if pim_interface in show_ip_pim_interface_json:
- pim_intf_json = show_ip_pim_interface_json\
- [pim_interface]
+ pim_intf_json = show_ip_pim_interface_json[pim_interface]
# Verifying PIM interface
- if pim_intf_json["address"] != pim_intf_ip and \
- pim_intf_json["state"] != "up":
- errormsg = ("[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, status check "
- "[FAILED]!! Expected : %s, Found : %s"
- %(dut, pim_interface, pim_intf_ip,
- pim_interface, pim_intf_json["state"]))
+ if (
+ pim_intf_json["address"] != pim_intf_ip
+ and pim_intf_json["state"] != "up"
+ ):
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ "PIM interface ip: %s, status check "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (
+ dut,
+ pim_interface,
+ pim_intf_ip,
+ pim_interface,
+ pim_intf_json["state"],
+ )
+ )
return errormsg
- logger.info("[DUT %s]: PIM interface: %s, "
- "interface ip: %s, status: %s"
- " [PASSED]!!",
- dut, pim_interface, pim_intf_ip,
- pim_intf_json["state"])
+ logger.info(
+ "[DUT %s]: PIM interface: %s, "
+ "interface ip: %s, status: %s"
+ " [PASSED]!!",
+ dut,
+ pim_interface,
+ pim_intf_ip,
+ pim_intf_json["state"],
+ )
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
if router != dut:
continue
- logger.info("[DUT: %s]: Verifying PIM interface status:",
- dut)
+ logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
rnode = tgen.routers()[dut]
- show_ip_igmp_interface_json = \
- run_frr_cmd(rnode, "show ip igmp interface json", isjson=True)
+ show_ip_igmp_interface_json = run_frr_cmd(
+ rnode, "show ip igmp interface json", isjson=True
+ )
- if igmp_iface in show_ip_igmp_interface_json:
+ if igmp_iface in show_ip_igmp_interface_json:
igmp_intf_json = show_ip_igmp_interface_json[igmp_iface]
# Verifying igmp interface
- if igmp_intf_json["address"] != interface_ip:
- errormsg = ("[DUT %s]: igmp interface ip is not correct "
- "[FAILED]!! Expected : %s, Found : %s"
- %(dut, igmp_intf_json["address"], interface_ip))
+ if igmp_intf_json["address"] != interface_ip:
+ errormsg = (
+ "[DUT %s]: igmp interface ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, igmp_intf_json["address"], interface_ip)
+ )
return errormsg
- logger.info("[DUT %s]: igmp interface: %s, "
- "interface ip: %s"
- " [PASSED]!!",
- dut, igmp_iface, interface_ip)
+ logger.info(
+ "[DUT %s]: igmp interface: %s, " "interface ip: %s" " [PASSED]!!",
+ dut,
+ igmp_iface,
+ interface_ip,
+ )
else:
- errormsg = ("[DUT %s]: igmp interface: %s "
- "igmp interface ip: %s, is not present "
- %(dut, igmp_iface, interface_ip))
+ errormsg = (
+ "[DUT %s]: igmp interface: %s "
+ "igmp interface ip: %s, is not present "
+ % (dut, igmp_iface, interface_ip)
+ )
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
def _get_snmp_oid(snmp_output):
tokens = snmp_output.strip().split()
-# if len(tokens) > 5:
-# return None
-
+ # if len(tokens) > 5:
+ # return None
# third token is the value of the object
- return tokens[0].split('.',1)[1]
+ return tokens[0].split(".", 1)[1]
def _parse_multiline(self, snmp_output):
results = snmp_output.strip().split("\r\n")
print("FAIL: missing oid key {}".format(oid))
return False
if results_dict[oid] != values[index]:
- print("FAIL{} {} |{}| == |{}|".format(oid, index, results_dict[oid], values[index]))
+ print(
+ "FAIL{} {} |{}| == |{}|".format(
+ oid, index, results_dict[oid], values[index]
+ )
+ )
return False
index += 1
return True
"f1": {
"static_routes": [
{"network": [BSR1_ADDR, CRP], "next_hop": "blackhole", "delete": True},
- {"network": BSR1_ADDR, "next_hop": NEXT_HOP1}
+ {"network": BSR1_ADDR, "next_hop": NEXT_HOP1},
]
}
}
"l1": {
"pim": {
"rp": [
- {"rp_addr": "33.33.33.33", "group_addr_range": ["225.1.1.1/32"],}
+ {
+ "rp_addr": "33.33.33.33",
+ "group_addr_range": ["225.1.1.1/32"],
+ }
]
}
}
assert (
rp_add1 == rp2[group]
), "Testcase {} :Failed \n Error : rp expected {} rp received {}".format(
- tc_name, rp_add1,
+ tc_name,
+ rp_add1,
)
# Verify if that rp is installed
# Add back route for RP to make it reachable
step("Add back route for RP to make it reachable")
input_dict = {
- "l1": {"static_routes": [{"network": rp_ip, "next_hop": next_hop_lhr,}]}
+ "l1": {
+ "static_routes": [
+ {
+ "network": rp_ip,
+ "next_hop": next_hop_lhr,
+ }
+ ]
+ }
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
assert (
rp_add1 == rp2[group]
), "Testcase {} :Failed \n Error : rp expected {} rp received {}".format(
- tc_name, rp_add1,
+ tc_name,
+ rp_add1,
)
# Verify if that rp is installed
clear_ip_mroute,
clear_ip_pim_interface_traffic,
verify_igmp_config,
- clear_ip_mroute_verify
+ clear_ip_mroute_verify,
)
from lib.topolog import logger
from lib.topojson import build_topo_from_json, build_config_from_json
data["src_address"],
_IGMP_JOIN_RANGE,
data["iif"],
- data["oil"]
+ data["oil"],
)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
done_flag = False
for retry in range(1, 11):
- result = verify_upstream_iif(tgen, "l1", "Unknown", source, IGMP_JOIN_RANGE_2,
- expected=False)
+ result = verify_upstream_iif(
+ tgen, "l1", "Unknown", source, IGMP_JOIN_RANGE_2, expected=False
+ )
if result is not True:
done_flag = True
else:
_IGMP_JOIN_RANGE,
data["iif"],
data["oil"],
- expected=False
+ expected=False,
)
if result is not True:
done_flag = True
"f1-i8-eth2",
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n mroutes are"
- " still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n mroutes are" " still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behavior: {}".format(result))
input_traffic = {"l1": {"traffic_sent": [intf_l1_i1]}}
result = verify_multicast_traffic(tgen, input_traffic, expected=False)
- assert result is not True, (
- "Testcase {} : Failed \n "
- " Traffic is not stopped yet \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " " Traffic is not stopped yet \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_igmp_groups(
tgen, dut, intf_l1_i1, IGMP_JOIN_RANGE_1, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "IGMP groups are not deleted \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "IGMP groups are not deleted \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "mroutes are still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
input_traffic = {"f1": {"traffic_sent": [intf_f1_i8]}}
result = verify_multicast_traffic(tgen, input_traffic, expected=False)
- assert result is not True, (
- "Testcase {} : Failed \n "
- " Traffic is not stopped yet \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " " Traffic is not stopped yet \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_igmp_groups(
tgen, dut, intf_f1_i8, IGMP_JOIN_RANGE_1, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "IGMP groups are not deleted \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "IGMP groups are not deleted \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "mroutes are still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "mroutes are still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "mroutes are still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_ip_mroutes(
tgen, "f1", source_i2, IGMP_JOIN_RANGE_1, intf_f1_i2, intf_f1_r2, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n mroutes are"
- " still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n mroutes are" " still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behavior: {}".format(result))
input_dict_2 = {
"l1": {
"igmp": {
- "interfaces": {intf_l1_i1: {"igmp": {"version": "2", "delete": True,}}}
+ "interfaces": {
+ intf_l1_i1: {
+ "igmp": {
+ "version": "2",
+ "delete": True,
+ }
+ }
+ }
}
}
}
dut = "l1"
interface = topo["routers"]["l1"]["links"]["i1"]["interface"]
result = verify_igmp_groups(tgen, dut, interface, IGMP_JOIN_RANGE_1, expected=False)
- assert result is not True, (
- "Testcase {} : Failed \n Groups are not"
- " present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Groups are not" " present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
input_dict_2 = {
"l1": {
"igmp": {
- "interfaces": {intf_l1_i1: {"igmp": {"version": "2", "delete": True,}}}
+ "interfaces": {
+ intf_l1_i1: {
+ "igmp": {
+ "version": "2",
+ "delete": True,
+ }
+ }
+ }
}
}
}
dut = "l1"
interface = topo["routers"]["l1"]["links"]["i1"]["interface"]
result = verify_igmp_groups(tgen, dut, interface, IGMP_JOIN_RANGE_1, expected=False)
- assert result is not True, (
- "Testcase {} : Failed \n Groups are not"
- " present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Groups are not" " present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
input_dict_2 = {
"l1": {
"igmp": {
- "interfaces": {intf_l1_i1: {"igmp": {"version": "2", "delete": True,}}}
+ "interfaces": {
+ intf_l1_i1: {
+ "igmp": {
+ "version": "2",
+ "delete": True,
+ }
+ }
+ }
}
}
}
result = verify_ip_mroutes(
tgen, dut, source, IGMP_JOIN_RANGE_1, iif, oil, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n routes are still"
- " present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n routes are still" " present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
input_dict_2 = {
"l1": {
"igmp": {
- "interfaces": {intf_l1_i1: {"igmp": {"version": "2", "delete": True,}}}
+ "interfaces": {
+ intf_l1_i1: {
+ "igmp": {
+ "version": "2",
+ "delete": True,
+ }
+ }
+ }
}
}
}
)
result = verify_igmp_config(tgen, input_dict_1, expected=False)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "IGMP interface is not removed \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "IGMP interface is not removed \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "mroute still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroute still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "mroute still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "mroute still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
IGMP_JOIN_RANGE_1,
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "upstream still present \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "upstream still present \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "RP iif is not updated \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "RP iif is not updated \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "RP iif is not updated \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "RP iif is not updated \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n "
- "RP iif is not updated \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "RP iif is not updated \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
"l1": {
"igmp": {
"interfaces": {
- "l1-i1-eth1": {"igmp": {"version": "2", "delete": True,}}
+ "l1-i1-eth1": {
+ "igmp": {
+ "version": "2",
+ "delete": True,
+ }
+ }
}
}
}
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n"
- "mroutes are cleared \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n" "mroutes are cleared \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n"
- " mroutes are cleared \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n" " mroutes are cleared \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
result = verify_ip_mroutes(
tgen, dut, src_address, _IGMP_JOIN_RANGE, iif, oil, expected=False
)
- assert result is not True, (
- "Testcase {} : Failed \n"
- " mroutes are cleared \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n" " mroutes are cleared \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n"
- " mroutes are cleared \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n" " mroutes are cleared \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n"
- " mroutes are cleared \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n" " mroutes are cleared \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
data["oil"],
expected=False,
)
- assert result is not True, (
- "Testcase {} : Failed \n"
- " mroutes are cleared \n Error: {}".format(tc_name, result)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n" " mroutes are cleared \n Error: {}".format(
+ tc_name, result
)
logger.info("Expected Behaviour: {}".format(result))
intf_c1_l1 = topo["routers"]["c1"]["links"]["l1"]["interface"]
step("verify before stats on C1")
- state_dict = {"c1": {intf_c1_l1: ["helloTx", "helloRx"],}}
+ state_dict = {
+ "c1": {
+ intf_c1_l1: ["helloTx", "helloRx"],
+ }
+ }
c1_state_before = verify_pim_interface_traffic(tgen, state_dict)
assert isinstance(
), "Testcase{} : Failed Error: {}" "stats incremented".format(tc_name, result)
step("verify before stats on l1")
- l1_state_dict = {"l1": {intf_l1_c1: ["helloTx", "helloRx"],}}
+ l1_state_dict = {
+ "l1": {
+ intf_l1_c1: ["helloTx", "helloRx"],
+ }
+ }
l1_state_before = verify_pim_interface_traffic(tgen, l1_state_dict)
assert isinstance(
l1_state_after = {}
step("verify before stats on C1")
- state_dict = {"c1": {intf_c1_l1: ["helloTx", "helloRx"],}}
+ state_dict = {
+ "c1": {
+ intf_c1_l1: ["helloTx", "helloRx"],
+ }
+ }
c1_state_before = verify_pim_interface_traffic(tgen, state_dict)
assert isinstance(
#
#####################################################
+
def config_to_send_igmp_join_and_traffic(tgen, tc_name):
"""
API to do pre-configuration to send IGMP join and multicast
input_dict = {
"r1": {
"pim": {
- "rp": [{"rp_addr": "1.0.2.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.2.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
}
}
def test_send_join_on_higher_preffered_rp_p1(request):
"""
- TC_11_P1 : Verify PIM join send towards the higher preferred RP
- TC_12_P1 : Verify PIM prune send towards the lower preferred RP
- TC_13_P1 : Verify RPF interface is updated in mroute (kernel) when higher
- preferred overlapping RP configured
- TC_14_P1 : Verify IIF and OIL in "show ip pim state" updated properly when
- higher preferred overlapping RP configured
- TC_15_P1 : Verify upstream interfaces(IIF) and join state are updated when
- higher preferred overlapping RP is configured
- TC_16_P1 : Verify join is send to lower preferred RP, when higher
- preferred RP gets deleted
- TC_17_P1 : Verify prune is send to higher preferred RP when higher
- preferred RP gets deleted
- TC_18_P1 : Verify RPF interface updated in mroute when higher preferred RP
- gets deleted
- TC_19_P1 : Verify IIF and OIL in "show ip pim state" updated when higher
- preferred overlapping RP is deleted
- TC_20_P1 : Verfiy PIM upstream IIF updated when higher preferred
- overlapping RP deleted
-
- Topology used:
- _______r2
- |
- iperf |
- r0-----r1
- |
- |_______r4
+ TC_11_P1 : Verify PIM join send towards the higher preferred RP
+ TC_12_P1 : Verify PIM prune send towards the lower preferred RP
+ TC_13_P1 : Verify RPF interface is updated in mroute (kernel) when higher
+ preferred overlapping RP configured
+ TC_14_P1 : Verify IIF and OIL in "show ip pim state" updated properly when
+ higher preferred overlapping RP configured
+ TC_15_P1 : Verify upstream interfaces(IIF) and join state are updated when
+ higher preferred overlapping RP is configured
+ TC_16_P1 : Verify join is send to lower preferred RP, when higher
+ preferred RP gets deleted
+ TC_17_P1 : Verify prune is send to higher preferred RP when higher
+ preferred RP gets deleted
+ TC_18_P1 : Verify RPF interface updated in mroute when higher preferred RP
+ gets deleted
+ TC_19_P1 : Verify IIF and OIL in "show ip pim state" updated when higher
+ preferred overlapping RP is deleted
+ TC_20_P1 : Verfiy PIM upstream IIF updated when higher preferred
+ overlapping RP deleted
+
+ Topology used:
+ _______r2
+ |
+ iperf |
+ r0-----r1
+ |
+ |_______r4
"""
tgen = get_topogen()
input_dict = {
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.4.17", "group_addr_range": ["225.1.1.1/32"],}]
+ "rp": [
+ {
+ "rp_addr": "1.0.4.17",
+ "group_addr_range": ["225.1.1.1/32"],
+ }
+ ]
}
}
}
input_dict = {
"r1": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r2": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r3": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
}
input_dict = {
"r1": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r2": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r3": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.1.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.1.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
}
input_dict = {
"r1": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r2": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r3": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
}
input_dict = {
"r1": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r2": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r3": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.3.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.3.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
},
}
input_dict = {
"r1": {
"pim": {
- "rp": [{"rp_addr": "1.0.2.17", "group_addr_range": GROUP_RANGE_ALL,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.2.17",
+ "group_addr_range": GROUP_RANGE_ALL,
+ }
+ ]
}
}
}
input_dict = {
"r2": {
"pim": {
- "rp": [{"rp_addr": "1.0.2.17", "group_addr_range": GROUP_RANGE_LIST_1,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.2.17",
+ "group_addr_range": GROUP_RANGE_LIST_1,
+ }
+ ]
}
},
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.4.17", "group_addr_range": GROUP_RANGE_LIST_2,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.4.17",
+ "group_addr_range": GROUP_RANGE_LIST_2,
+ }
+ ]
}
},
}
input_dict = {
"r2": {
"pim": {
- "rp": [{"rp_addr": "1.0.2.17", "group_addr_range": GROUP_RANGE_LIST_1,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.2.17",
+ "group_addr_range": GROUP_RANGE_LIST_1,
+ }
+ ]
}
},
"r4": {
"pim": {
- "rp": [{"rp_addr": "1.0.4.17", "group_addr_range": GROUP_RANGE_LIST_2,}]
+ "rp": [
+ {
+ "rp_addr": "1.0.4.17",
+ "group_addr_range": GROUP_RANGE_LIST_2,
+ }
+ ]
}
},
}
pytestmark = [pytest.mark.ospfd]
+
class OspfSrTopo(Topo):
"Test topology builder"
"step1/show_ip_route_initial.ref",
)
+
def test_ospf_link_protection_step2():
logger.info("Test (step 2): check OSPF link protection")
tgen = get_topogen()
pytest.skip(tgen.errors)
# enable TI-LFA link protection on all interfaces
- tgen.net["rt1"].cmd(
- 'vtysh -c "conf t" -c "router ospf" -c "fast-reroute ti-lfa"'
- )
+ tgen.net["rt1"].cmd('vtysh -c "conf t" -c "router ospf" -c "fast-reroute ti-lfa"')
router_compare_json_output(
"rt1",
"step2/show_ip_route_initial.ref",
)
+
def test_ospf_node_protection_step3():
logger.info("Test (step 3): check OSPF node protection")
tgen = get_topogen()
"step3/show_ip_route_initial.ref",
)
+
# Memory leak test template
def test_memory_leak():
"Run the memory leak test and report results."
tgen.report_memory_leaks()
+
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
pytestmark = [pytest.mark.ospfd]
+
class OSPFTopo(Topo):
"Test topology builder"
pytestmark = [pytest.mark.ospfd]
+
class OSPFTopo(Topo):
"Test topology builder"
% (i, diff)
)
else:
- logger.error(
- "r{} failed - no nhid ref file: {}".format(i, refTableFile)
- )
+ logger.error("r{} failed - no nhid ref file: {}".format(i, refTableFile))
assert False, (
"Linux Kernel IPv6 Routing Table verification failed for router r%s\n"
% (i)
)
+
def test_shutdown_check_stderr():
tgen = get_topogen()
shutdown_bringup_interface,
topo_daemons,
verify_rib,
- stop_router, start_router,
+ stop_router,
+ start_router,
create_static_routes,
start_router_daemons,
- kill_router_daemons
+ kill_router_daemons,
)
-from lib.ospf import (
- verify_ospf_neighbor, verify_ospf_rib,
- create_router_ospf)
+from lib.ospf import verify_ospf_neighbor, verify_ospf_rib, create_router_ospf
from lib.topolog import logger
from lib.topojson import build_topo_from_json, build_config_from_json
topo = None
NETWORK = {
- "ipv4": ["11.0.20.1/32", "11.0.20.2/32", "11.0.20.3/32", "11.0.20.4/32",
- "11.0.20.5/32"]
+ "ipv4": [
+ "11.0.20.1/32",
+ "11.0.20.2/32",
+ "11.0.20.3/32",
+ "11.0.20.4/32",
+ "11.0.20.5/32",
+ ]
}
"""
Topology:
except IOError:
assert False, "Could not read file {}".format(jsonFile)
+
class CreateTopo(Topo):
"""
Test topology builder.
step(
"Create static routes(10.0.20.1/32) in R1 and redistribute "
- "to OSPF using route map.")
+ "to OSPF using route map."
+ )
# Create Static routes
input_dict = {
"r0": {
"static_routes": [
{
- "network": NETWORK['ipv4'][0],
+ "network": NETWORK["ipv4"][0],
"no_of_ip": 5,
- "next_hop": 'Null0',
+ "next_hop": "Null0",
}
]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- ospf_red_r0 = {
- "r0": {
- "ospf": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
- }
- }
+ ospf_red_r0 = {"r0": {"ospf": {"redistribute": [{"redist_type": "static"}]}}}
result = create_router_ospf(tgen, topo, ospf_red_r0)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify OSPF neighbors after base config is done.")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step("Verify that route is advertised to R1.")
- dut = 'r1'
- protocol = 'ospf'
- nh = topo['routers']['r0']['links']['r1']['ipv4'].split('/')[0]
+ dut = "r1"
+ protocol = "ospf"
+ nh = topo["routers"]["r0"]["links"]["r1"]["ipv4"].split("/")[0]
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(
- tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Kill OSPFd daemon on R0.")
kill_router_daemons(tgen, "r0", ["ospfd"])
step("Verify OSPF neighbors are down after killing ospfd in R0")
- dut = 'r0'
+ dut = "r0"
# Api call verify whether OSPF is converged
- ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut,
- expected=False)
- assert ospf_covergence is not True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut, expected=False)
+ assert ospf_covergence is not True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step("Verify that route advertised to R1 are deleted from RIB and FIB.")
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("Bring up OSPFd daemon on R0.")
start_router_daemons(tgen, "r0", ["ospfd"])
step("Verify OSPF neighbors are up after bringing back ospfd in R0")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(
"All the neighbours are up and routes are installed before the"
- " restart. Verify OSPF route table and ip route table.")
- dut = 'r1'
- protocol = 'ospf'
+ " restart. Verify OSPF route table and ip route table."
+ )
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Kill OSPFd daemon on R1.")
kill_router_daemons(tgen, "r1", ["ospfd"])
step("Verify OSPF neighbors are down after killing ospfd in R1")
- dut = 'r1'
+ dut = "r1"
# Api call verify whether OSPF is converged
- ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut,
- expected=False)
- assert ospf_covergence is not True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut, expected=False)
+ assert ospf_covergence is not True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step("Bring up OSPFd daemon on R1.")
start_router_daemons(tgen, "r1", ["ospfd"])
step("Verify OSPF neighbors are up after bringing back ospfd in R1")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(
"All the neighbours are up and routes are installed before the"
- " restart. Verify OSPF route table and ip route table.")
+ " restart. Verify OSPF route table and ip route table."
+ )
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
step(
"Create static routes(10.0.20.1/32) in R1 and redistribute "
- "to OSPF using route map.")
+ "to OSPF using route map."
+ )
# Create Static routes
input_dict = {
"r0": {
"static_routes": [
{
- "network": NETWORK['ipv4'][0],
+ "network": NETWORK["ipv4"][0],
"no_of_ip": 5,
- "next_hop": 'Null0',
+ "next_hop": "Null0",
}
]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- ospf_red_r0 = {
- "r0": {
- "ospf": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
- }
- }
+ ospf_red_r0 = {"r0": {"ospf": {"redistribute": [{"redist_type": "static"}]}}}
result = create_router_ospf(tgen, topo, ospf_red_r0)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify OSPF neighbors after base config is done.")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step("Verify that route is advertised to R1.")
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
- nh = topo['routers']['r0']['links']['r1']['ipv4'].split('/')[0]
+ nh = topo["routers"]["r0"]["links"]["r1"]["ipv4"].split("/")[0]
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Restart frr on R0")
- stop_router(tgen, 'r0')
- start_router(tgen, 'r0')
+ stop_router(tgen, "r0")
+ start_router(tgen, "r0")
step("Verify OSPF neighbors are up after restarting R0")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(
"All the neighbours are up and routes are installed before the"
- " restart. Verify OSPF route table and ip route table.")
- dut = 'r1'
- protocol = 'ospf'
+ " restart. Verify OSPF route table and ip route table."
+ )
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Restart frr on R1")
- stop_router(tgen, 'r1')
- start_router(tgen, 'r1')
+ stop_router(tgen, "r1")
+ start_router(tgen, "r1")
step("Verify OSPF neighbors are up after restarting R1")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(
"All the neighbours are up and routes are installed before the"
- " restart. Verify OSPF route table and ip route table.")
- dut = 'r1'
- protocol = 'ospf'
+ " restart. Verify OSPF route table and ip route table."
+ )
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
step(
"Create static routes(10.0.20.1/32) in R1 and redistribute "
- "to OSPF using route map.")
+ "to OSPF using route map."
+ )
# Create Static routes
input_dict = {
"r0": {
"static_routes": [
{
- "network": NETWORK['ipv4'][0],
+ "network": NETWORK["ipv4"][0],
"no_of_ip": 5,
- "next_hop": 'Null0',
+ "next_hop": "Null0",
}
]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- ospf_red_r0 = {
- "r0": {
- "ospf": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
- }
- }
+ ospf_red_r0 = {"r0": {"ospf": {"redistribute": [{"redist_type": "static"}]}}}
result = create_router_ospf(tgen, topo, ospf_red_r0)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify OSPF neighbors after base config is done.")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step("Verify that route is advertised to R1.")
- dut = 'r1'
- protocol = 'ospf'
- nh = topo['routers']['r0']['links']['r1']['ipv4'].split('/')[0]
+ dut = "r1"
+ protocol = "ospf"
+ nh = topo["routers"]["r0"]["links"]["r1"]["ipv4"].split("/")[0]
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Kill staticd daemon on R0.")
kill_router_daemons(tgen, "r0", ["staticd"])
step("Verify that route advertised to R1 are deleted from RIB and FIB.")
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- expected=False)
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("Bring up staticd daemon on R0.")
start_router_daemons(tgen, "r0", ["staticd"])
step("Verify OSPF neighbors are up after bringing back ospfd in R0")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(
"All the neighbours are up and routes are installed before the"
- " restart. Verify OSPF route table and ip route table.")
- dut = 'r1'
- protocol = 'ospf'
+ " restart. Verify OSPF route table and ip route table."
+ )
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Kill staticd daemon on R1.")
kill_router_daemons(tgen, "r1", ["staticd"])
step("Verify OSPF neighbors are up after bringing back ospfd in R1")
# Api call verify whether OSPF is converged
ospf_covergence = verify_ospf_neighbor(tgen, topo)
- assert ospf_covergence is True, ("setup_module :Failed \n Error:"
- " {}".format(ospf_covergence))
+ assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
+ ospf_covergence
+ )
step(
"All the neighbours are up and routes are installed before the"
- " restart. Verify OSPF route table and ip route table.")
+ " restart. Verify OSPF route table and ip route table."
+ )
- dut = 'r1'
- protocol = 'ospf'
+ dut = "r1"
+ protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
- result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
- next_hop=nh)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
-
logger.info("Running setup_module() done")
"links": {
"r3": {
"interface": topo["routers"]["r0"]["links"]["r3"]["interface"],
- "ospf": {
- "area": "0.0.0.0",
- "networkType":"POINTOMULTIPOINT"
- },
+ "ospf": {"area": "0.0.0.0", "networkType": "POINTOMULTIPOINT"},
}
}
}
##
#####################################################
+
def setup_module(module):
"Setup topology"
tgen = Topogen(NetworkTopo, module.__name__)
pytestmark = [pytest.mark.pimd]
+
class PIMTopo(Topo):
def build(self, *_args, **_opts):
"Build function"
import os
import pytest
import platform
+
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
from mininet.topo import Topo
from lib.topogen import Topogen, get_topogen
from lib.topotest import version_cmp
+
# Import topoJson from lib, to create topology and initial configuration
from lib.common_config import (
start_topology,
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
-
write_test_footer(tc_name)
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
write_test_footer(tc_name)
" value and all the nexthop populated in RIB and FIB again"
)
for addr_type in ADDR_TYPES:
- input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type],}]}}
+ input_dict_4 = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": PREFIX1[addr_type],
+ }
+ ]
+ }
+ }
nh = NEXT_HOP_IP["nh1"][addr_type]
result = verify_rib(
tgen, addr_type, dut, input_dict_4, next_hop=nh, protocol=protocol, fib=True
protocol=protocol,
fib=True,
)
- assert result is True, (
- "Testcase {} : Failed \nError: Route "
- " is missing in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \nError: Route " " is missing in RIB".format(
+ tc_name
)
write_test_footer(tc_name)
pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
+
class CreateTopo(Topo):
"""
Test CreateTopo - topology 1.
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
step("BGP neighbor remove and add")
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
dut = "r3"
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
step("Remove the redistribute static knob")
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
step("BGP neighbor remove and add")
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
dut = "r3"
for addr_type in ADDR_TYPES:
input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type]}]}}
result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
- assert result is True, (
- "Testcase {} : Failed \n"
- "Error: Routes are still present in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n" "Error: Routes are still present in RIB".format(
+ tc_name
)
step("Remove the redistribute static knob")
" value and all the nexthop populated in RIB and FIB again"
)
for addr_type in ADDR_TYPES:
- input_dict_4 = {"r2": {"static_routes": [{"network": PREFIX1[addr_type],}]}}
+ input_dict_4 = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": PREFIX1[addr_type],
+ }
+ ]
+ }
+ }
nh = NEXT_HOP_IP["nh1"][addr_type]
result = verify_rib(
tgen, addr_type, dut, input_dict_4, next_hop=nh, protocol=protocol, fib=True
protocol=protocol,
fib=True,
)
- assert result is True, (
- "Testcase {} : Failed \nError: Route "
- " is missing in RIB".format(tc_name)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \nError: Route " " is missing in RIB".format(
+ tc_name
)
step("Remove the redistribute static knob")
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
)
from lib.topojson import build_topo_from_json, build_config_from_json
from lib.topotest import version_cmp
+
# Reading the data from JSON File for topology creation
jsonFile = "{}/static_routes_topo4_ibgp.json".format(CWD)
try:
pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
+
class CreateTopo(Topo):
"""
Test CreateTopo - topology 1.
# Creating configuration from JSON
build_config_from_json(tgen, topo)
- if version_cmp(platform.release(), '4.19') < 0:
- error_msg = ('These tests will not run. (have kernel "{}", '
- 'requires kernel >= 4.19)'.format(platform.release()))
+ if version_cmp(platform.release(), "4.19") < 0:
+ error_msg = (
+ 'These tests will not run. (have kernel "{}", '
+ "requires kernel >= 4.19)".format(platform.release())
+ )
pytest.skip(error_msg)
# Checking BGP convergence
router_list = tgen.routers()
for rname, router in router_list.items():
router.load_config(
- TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)))
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
+ )
router.load_config(
- TopoRouter.RD_SHARP, os.path.join(CWD, "{}/sharpd.conf".format(rname)))
+ TopoRouter.RD_SHARP, os.path.join(CWD, "{}/sharpd.conf".format(rname))
+ )
# Initialize all routers.
tgen.start_router()
_, result = topotest.run_and_expect(test_func, None, count=2, wait=0.5)
assert result is None, '"r1" JSON output mismatches'
+
def test_route_map_usage():
"Test that FRR only reruns over routes associated with the routemap"
logger.info("Test that FRR runs on selected re's on route-map changes")
r1.vtysh_cmd("conf\nroute-map static permit 10\nset src 192.168.215.1")
r1.vtysh_cmd("conf\naccess-list 5 seq 5 permit 10.0.0.44/32")
r1.vtysh_cmd("conf\naccess-list 10 seq 5 permit 10.0.1.0/24")
- r1.vtysh_cmd("conf\nroute-map sharp permit 10\nmatch ip address 10\nset src 192.168.214.1")
+ r1.vtysh_cmd(
+ "conf\nroute-map sharp permit 10\nmatch ip address 10\nset src 192.168.214.1"
+ )
r1.vtysh_cmd("conf\nroute-map sharp permit 20\nset src 192.168.213.1")
r1.vtysh_cmd("conf\nip protocol static route-map static")
r1.vtysh_cmd("conf\nip protocol sharp route-map sharp")
static_rmapfile = "%s/r1/static_rmap.ref" % (thisDir)
expected = open(static_rmapfile).read().rstrip()
- expected = ('\n'.join(expected.splitlines()) + '\n').rstrip()
+ expected = ("\n".join(expected.splitlines()) + "\n").rstrip()
actual = r1.vtysh_cmd("show route-map static")
- actual = ('\n'.join(actual.splitlines()) + '\n').rstrip()
- logger.info("Does the show route-map static command run the correct number of times")
+ actual = ("\n".join(actual.splitlines()) + "\n").rstrip()
+ logger.info(
+ "Does the show route-map static command run the correct number of times"
+ )
- diff = topotest.get_textdiff(actual, expected,
- title1 = "Actual Route-map output",
- title2 = "Expected Route-map output")
+ diff = topotest.get_textdiff(
+ actual,
+ expected,
+ title1="Actual Route-map output",
+ title2="Expected Route-map output",
+ )
if diff:
logger.info("Actual:")
logger.info(actual)
logger.info("Expected:")
logger.info(expected)
srun = r1.vtysh_cmd("show run")
- srun = ('\n'.join(srun.splitlines()) + '\n').rstrip()
+ srun = ("\n".join(srun.splitlines()) + "\n").rstrip()
logger.info("Show run")
logger.info(srun)
assert 0, "r1 static route processing:\n"
sharp_rmapfile = "%s/r1/sharp_rmap.ref" % (thisDir)
expected = open(sharp_rmapfile).read().rstrip()
- expected = ('\n'.join(expected.splitlines()) + '\n').rstrip()
+ expected = ("\n".join(expected.splitlines()) + "\n").rstrip()
actual = r1.vtysh_cmd("show route-map sharp")
- actual = ('\n'.join(actual.splitlines()) + '\n').rstrip()
+ actual = ("\n".join(actual.splitlines()) + "\n").rstrip()
logger.info("Does the show route-map sharp command run the correct number of times")
- diff = topotest.get_textdiff(actual, expected,
- title1 = "Actual Route-map output",
- title2 = "Expected Route-map output")
+ diff = topotest.get_textdiff(
+ actual,
+ expected,
+ title1="Actual Route-map output",
+ title2="Expected Route-map output",
+ )
if diff:
logger.info("Actual:")
logger.info(actual)
logger.info("Expected:")
logger.info(expected)
srun = r1.vtysh_cmd("show run")
- srun = ('\n'.join(srun.splitlines()) + '\n').rstrip()
+ srun = ("\n".join(srun.splitlines()) + "\n").rstrip()
logger.info("Show run:")
logger.info(srun)
assert 0, "r1 sharp route-map processing:\n"
- logger.info("Add a extension to the static route-map to see the static route go away")
+ logger.info(
+ "Add a extension to the static route-map to see the static route go away"
+ )
r1.vtysh_cmd("conf\nroute-map sharp deny 5\nmatch ip address 5")
sleep(2)
# we are only checking the kernel here as that this will give us the implied
logger.info("Test that the routes installed are correct")
sharp_ipfile = "%s/r1/iproute.ref" % (thisDir)
expected = open(sharp_ipfile).read().rstrip()
- expected = ('\n'.join(expected.splitlines()) + '\n').rstrip()
+ expected = ("\n".join(expected.splitlines()) + "\n").rstrip()
actual = r1.run("ip route show")
- actual = ('\n'.join(actual.splitlines()) + '\n').rstrip()
+ actual = ("\n".join(actual.splitlines()) + "\n").rstrip()
actual = re.sub(r" nhid [0-9][0-9]", "", actual)
actual = re.sub(r" proto sharp", " proto XXXX", actual)
actual = re.sub(r" proto static", " proto XXXX", actual)
actual = re.sub(r" proto XXXX ", " proto XXXX ", actual)
actual = re.sub(r" metric", " metric", actual)
actual = re.sub(r" link ", " link ", actual)
- diff = topotest.get_textdiff(actual, expected,
- title1 = "Actual ip route show",
- title2 = "Expected ip route show")
+ diff = topotest.get_textdiff(
+ actual, expected, title1="Actual ip route show", title2="Expected ip route show"
+ )
if diff:
logger.info("Actual:")
logger.info("Expected:")
logger.info(expected)
srun = r1.vtysh_cmd("show run")
- srun = ('\n'.join(srun.splitlines()) + '\n').rstrip()
+ srun = ("\n".join(srun.splitlines()) + "\n").rstrip()
logger.info("Show run:")
logger.info(srun)
assert 0, "r1 ip route show is not correct:"
+
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()