def build_topo(tgen):
- for routern in range(1, 3):
+ for routern in range(1, 4):
tgen.add_router("r{}".format(routern))
switch = tgen.add_switch("s1")
switch.add_link(tgen.gears["r1"])
switch.add_link(tgen.gears["r2"])
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["r2"])
+ switch.add_link(tgen.gears["r3"])
+
def setup_module(mod):
tgen = Topogen(build_topo, mod.__name__)
" -M bgpd_rpki" if rname == "r2" else "",
)
+ tgen.gears["r2"].run("ip link add vrf10 type vrf table 10")
+ tgen.gears["r2"].run("ip link set vrf10 up")
+
+ tgen.gears["r2"].run("ip link set r2-eth1 master vrf10")
+
tgen.start_router()
global rtrd_process
+ rtrd_process = {}
- rname = "r1"
-
- rtr_path = os.path.join(CWD, rname)
- log_dir = os.path.join(tgen.logdir, rname)
- log_file = os.path.join(log_dir, "rtrd.log")
+ for rname in ["r1", "r3"]:
+ rtr_path = os.path.join(CWD, rname)
+ log_dir = os.path.join(tgen.logdir, rname)
+ log_file = os.path.join(log_dir, "rtrd.log")
- tgen.gears[rname].cmd("chmod u+x {}/rtrd.py".format(rtr_path))
- rtrd_process = tgen.gears[rname].popen("{}/rtrd.py {}".format(rtr_path, log_file))
+ tgen.gears[rname].cmd("chmod u+x {}/rtrd.py".format(rtr_path))
+ rtrd_process[rname] = tgen.gears[rname].popen(
+ "{}/rtrd.py {}".format(rtr_path, log_file)
+ )
def teardown_module(mod):
tgen = get_topogen()
- logger.info("r1: sending SIGTERM to rtrd RPKI server")
- rtrd_process.kill()
+ for rname in ["r1", "r3"]:
+ logger.info("{}: sending SIGTERM to rtrd RPKI server".format(rname))
+ rtrd_process[rname].kill()
+
tgen.stop_topology()
for rname in ["r1", "r3"]:
logger.info("{}: checking if rtrd is running".format(rname))
- if rtrd_process.poll() is not None:
+ if rtrd_process[rname].poll() is not None:
pytest.skip(tgen.errors)
rname = "r2"
for rname in ["r1", "r3"]:
logger.info("{}: checking if rtrd is running".format(rname))
- if rtrd_process.poll() is not None:
+ if rtrd_process[rname].poll() is not None:
pytest.skip(tgen.errors)
def _show_rpki_no_connection(rname):
for rname in ["r1", "r3"]:
logger.info("{}: checking if rtrd is running".format(rname))
- if rtrd_process.poll() is not None:
+ if rtrd_process[rname].poll() is not None:
pytest.skip(tgen.errors)
step("Restore RPKI server configuration")
for rname in ["r1", "r3"]:
logger.info("{}: checking if rtrd is running".format(rname))
- if rtrd_process.poll() is not None:
+ if rtrd_process[rname].poll() is not None:
pytest.skip(tgen.errors)
step("Apply RPKI valid route-map on neighbor")
assert result is None, "Unexpected prefixes RPKI state on {}".format(rname)
+def test_show_bgp_rpki_prefixes_vrf():
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r3"]:
+ logger.info("{}: checking if rtrd is running".format(rname))
+ if rtrd_process[rname].poll() is not None:
+ pytest.skip(tgen.errors)
+
+ step("Configure RPKI cache server on vrf10")
+
+ rname = "r2"
+ tgen.gears[rname].vtysh_cmd(
+ """
+configure
+vrf vrf10
+ rpki
+ rpki cache 192.0.2.3 15432 preference 1
+ exit
+exit
+"""
+ )
+
+ step("Check vrf10 RPKI prefix table")
+
+ expected = open(os.path.join(CWD, "{}/rpki_prefix_table.json".format(rname))).read()
+ expected_json = json.loads(expected)
+ test_func = functools.partial(show_rpki_prefixes, rname, expected_json, vrf="vrf10")
+ _, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert result is None, "Failed to see RPKI prefixes on {}".format(rname)
+
+ for rpki_state in ["valid", "notfound", None]:
+ if rpki_state:
+ step(
+ "Check RPKI state of prefixes in vrf10 BGP table: {}".format(rpki_state)
+ )
+ else:
+ step("Check prefixes in vrf10 BGP table")
+ expected = open(
+ os.path.join(
+ CWD,
+ "{}/bgp_table_rpki_{}.json".format(
+ rname, rpki_state if rpki_state else "any"
+ ),
+ )
+ ).read()
+ expected_json = json.loads(expected)
+ test_func = functools.partial(
+ show_bgp_ipv4_table_rpki, rname, rpki_state, expected_json, vrf="vrf10"
+ )
+ _, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert result is None, "Unexpected prefixes RPKI state on {}".format(rname)
+
+
+def test_show_bgp_rpki_route_map_vrf():
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r3"]:
+ logger.info("{}: checking if rtrd is running".format(rname))
+ if rtrd_process[rname].poll() is not None:
+ pytest.skip(tgen.errors)
+
+ step("Apply RPKI valid route-map on vrf10 neighbor")
+
+ rname = "r2"
+ tgen.gears[rname].vtysh_cmd(
+ """
+configure
+router bgp 65002 vrf vrf10
+ address-family ipv4 unicast
+ neighbor 192.0.2.3 route-map RPKI in
+"""
+ )
+
+ for rpki_state in ["valid", "notfound", None]:
+ if rpki_state:
+ step(
+ "Check RPKI state of prefixes in vrf10 BGP table: {}".format(rpki_state)
+ )
+ else:
+ step("Check prefixes in vrf10 BGP table")
+ expected = open(
+ os.path.join(
+ CWD,
+ "{}/bgp_table_rmap_rpki_{}.json".format(
+ rname, rpki_state if rpki_state else "any"
+ ),
+ )
+ ).read()
+ expected_json = json.loads(expected)
+ test_func = functools.partial(
+ show_bgp_ipv4_table_rpki,
+ rname,
+ rpki_state,
+ expected_json,
+ vrf="vrf10",
+ )
+ _, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert result is None, "Unexpected prefixes RPKI state on {}".format(rname)
+
+
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))