diff options
Diffstat (limited to 'tests/topotests/pim_autorp/test_pim_autorp.py')
| -rw-r--r-- | tests/topotests/pim_autorp/test_pim_autorp.py | 207 | 
1 files changed, 207 insertions, 0 deletions
diff --git a/tests/topotests/pim_autorp/test_pim_autorp.py b/tests/topotests/pim_autorp/test_pim_autorp.py new file mode 100644 index 0000000000..5aecce942e --- /dev/null +++ b/tests/topotests/pim_autorp/test_pim_autorp.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python +# SPDX-License-Identifier: ISC + +# +# test_pim_autorp.py +# +# Copyright (c) 2024 ATCorp +# Nathan Bahr +# + +import os +import sys +import pytest + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen +from lib.topolog import logger +from lib.pim import scapy_send_autorp_raw_packet, verify_pim_rp_info, verify_pim_rp_info_is_empty +from lib.common_config import step, write_test_header + +from time import sleep + +""" +test_pim_autorp.py: Test general PIM AutoRP functionality +""" + +TOPOLOGY = """ +   Basic AutoRP functionality + +    +---+---+                      +---+---+ +    |       |    10.10.76.0/24     |       | +    +  R1   + <------------------> +  R2   | +    |       | .1                .2 |       | +    +---+---+                      +---+---+ +""" + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# Required to instantiate the topology builder class. +pytestmark = [pytest.mark.pimd] + + +def build_topo(tgen): +    "Build function" + +    # Create routers +    tgen.add_router("r1") +    tgen.add_router("r2") + +    # Create link between router 1 and 2 +    switch = tgen.add_switch("s1-2") +    switch.add_link(tgen.gears["r1"]) +    switch.add_link(tgen.gears["r2"]) + +def setup_module(mod): +    logger.info("PIM AutoRP basic functionality:\n {}".format(TOPOLOGY)) + +    tgen = Topogen(build_topo, mod.__name__) +    tgen.start_topology() + +    # Router 1 will be the router configured with "fake" autorp configuration, so give it a default route +    # to router 2 so that routing to the RP address is not an issue +    # r1_defrt_setup_cmds = [ +    #     "ip route add default via 10.10.76.1 dev r1-eth0", +    # ] +    # for cmd in r1_defrt_setup_cmds: +    #     tgen.net["r1"].cmd(cmd) + +    logger.info("Testing PIM AutoRP support") +    router_list = tgen.routers() +    for rname, router in router_list.items(): +        logger.info("Loading router %s" % rname) +        router.load_frr_config(os.path.join(CWD, "{}/frr.conf".format(rname))) + +    # Initialize all routers. +    tgen.start_router() +    for router in router_list.values(): +        if router.has_version("<", "4.0"): +            tgen.set_error("unsupported version") + + +def teardown_module(mod): +    "Teardown the pytest environment" +    tgen = get_topogen() +    tgen.stop_topology() + +def test_pim_autorp_discovery_single_rp(request): +    "Test PIM AutoRP Discovery with single RP" +    tgen = get_topogen() +    tc_name = request.node.name +    write_test_header(tc_name) + +    if tgen.routers_have_failure(): +        pytest.skip(tgen.errors) + +    step("Start with no RP configuration") +    result = verify_pim_rp_info_is_empty(tgen, "r1") +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +    step("Send AutoRP packet from r1 to r2") +    # 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4 +    data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000" +    scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data) + +    step("Verify rp-info from AutoRP packet") +    result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True) +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +    step("Verify AutoRP configuration times out") +    result = verify_pim_rp_info_is_empty(tgen, "r2") +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +def test_pim_autorp_discovery_multiple_rp(request): +    "Test PIM AutoRP Discovery with multiple RP's" +    tgen = get_topogen() +    tc_name = request.node.name +    write_test_header(tc_name) + +    if tgen.routers_have_failure(): +        pytest.skip("skipped because of router(s) failure") + +    step("Start with no RP configuration") +    result = verify_pim_rp_info_is_empty(tgen, "r2") +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +    step("Send AutoRP packet from r1 to r2") +    # 2 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/8, 10.10.76.3, group(s) 225.0.0.0/8 +    data = "01005e00012800127f55cfb1080045c0003c700c000008110ab20a0a4c01e000012801f001f000283f5712020005000000000a0a4c0103010008e00000000a0a4c0303010008e1000000" +    scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data) + +    step("Verify rp-info from AutoRP packet") +    result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/8", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True) +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) +    result = verify_pim_rp_info(tgen, None, "r2", "225.0.0.0/8", "r2-eth0", "10.10.76.3", "AutoRP", False, "ipv4", True) +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + +def test_pim_autorp_discovery_static(request): +    "Test PIM AutoRP Discovery with Static RP" +    tgen = get_topogen() +    tc_name = request.node.name +    write_test_header(tc_name) + +    if tgen.routers_have_failure(): +        pytest.skip("skipped because of router(s) failure") + +    step("Start with no RP configuration") +    result = verify_pim_rp_info_is_empty(tgen, "r2") +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +    step("Add static RP configuration to r2") +    rnode = tgen.routers()["r2"] +    rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'rp 10.10.76.3 224.0.0.0/4'") + +    step("Verify static rp-info from r2") +    result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.3", "Static", False, "ipv4", True) +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +    step("Send AutoRP packet from r1 to r2") +    # 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4 +    data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000" +    scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data) + +    step("Verify rp-info from AutoRP packet") +    result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True) +    assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + +def test_pim_autorp_announce_group(request): +    "Test PIM AutoRP Announcement with a single group" +    tgen = get_topogen() +    tc_name = request.node.name +    write_test_header(tc_name) + +    if tgen.routers_have_failure(): +        pytest.skip("skipped because of router(s) failure") + +    step("Add candidate RP configuration to r1") +    rnode = tgen.routers()["r1"] +    rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce 10.10.76.1 224.0.0.0/4'") +    step("Verify Announcement sent data") +    # TODO: Verify AutoRP mapping agent receives candidate RP announcement +    # Mapping agent is not yet implemented +    #sleep(10) +    step("Change AutoRP Announcement packet parameters") +    rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce scope 8 interval 10 holdtime 60'") +    step("Verify Announcement sent data") +    # TODO: Verify AutoRP mapping agent receives updated candidate RP announcement +    # Mapping agent is not yet implemented +    #sleep(10) + + +def test_memory_leak(): +    "Run the memory leak test and report results." +    tgen = get_topogen() +    if not tgen.is_memleak_enabled(): +        pytest.skip("Memory leak test/report is disabled") + +    tgen.report_memory_leaks() + + +if __name__ == "__main__": +    args = ["-s"] + sys.argv[1:] +    sys.exit(pytest.main(args))  | 
