assert result is None, "Received RPKI extended community"
+def test_show_bgp_rpki_as_number():
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r3"]:
+ logger.info("{}: checking if rtrd is running".format(rname))
+ if rtrd_process[rname].poll() is not None:
+ pytest.skip(tgen.errors)
+
+ step("Check RPKI prefixes for ASN 65531")
+
+ rname = "r2"
+ output = json.loads(tgen.gears[rname].vtysh_cmd("show rpki as-number 65531 json"))
+
+ # Expected output should show no prefixes for this ASN
+ expected = {"ipv4PrefixCount": 0, "ipv6PrefixCount": 0, "prefixes": []}
+
+ assert output == expected, "Found unexpected RPKI prefixes for ASN 65531"
+
+
+def test_show_bgp_rpki_as_number_65530():
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r3"]:
+ logger.info("{}: checking if rtrd is running".format(rname))
+ if rtrd_process[rname].poll() is not None:
+ pytest.skip(tgen.errors)
+
+ step("Check RPKI prefixes for ASN 65530")
+
+ rname = "r2"
+ output = json.loads(tgen.gears[rname].vtysh_cmd("show rpki as-number 65530 json"))
+
+ expected = {
+ "prefixes": [
+ {
+ "prefix": "198.51.100.0",
+ "prefixLenMin": 24,
+ "prefixLenMax": 24,
+ "asn": 65530,
+ },
+ {
+ "prefix": "203.0.113.0",
+ "prefixLenMin": 24,
+ "prefixLenMax": 24,
+ "asn": 65530,
+ },
+ ],
+ "ipv4PrefixCount": 2,
+ "ipv6PrefixCount": 0,
+ }
+
+ assert (
+ output == expected
+ ), "RPKI prefixes for ASN 65530 do not match expected output"
+
+
+def test_rpki_stop_and_check_connection():
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r3"]:
+ logger.info("{}: checking if rtrd is running".format(rname))
+ if rtrd_process[rname].poll() is not None:
+ pytest.skip(tgen.errors)
+
+ step("Stop RPKI on r2")
+ rname = "r2"
+ tgen.gears[rname].vtysh_cmd("rpki stop")
+
+ step("Check RPKI cache connection status")
+ output = json.loads(tgen.gears[rname].vtysh_cmd("show rpki cache-connection json"))
+
+ expected = {"error": "No connection to RPKI cache server."}
+ assert (
+ output == expected
+ ), "RPKI cache connection status does not show as disconnected"
+
+
+def test_rpki_start_and_check_connection():
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r3"]:
+ logger.info("{}: checking if rtrd is running".format(rname))
+ if rtrd_process[rname].poll() is not None:
+ pytest.skip(tgen.errors)
+
+ step("Start RPKI on r2")
+ rname = "r2"
+ tgen.gears[rname].vtysh_cmd("rpki start")
+
+ def _check_rpki_connection():
+ output = json.loads(
+ tgen.gears[rname].vtysh_cmd("show rpki cache-connection json")
+ )
+ # We expect to see a connected group and at least one connection
+ return "connectedGroup" in output and "connections" in output
+
+ step("Check RPKI cache connection status")
+ _, result = topotest.run_and_expect(
+ _check_rpki_connection, True, count=60, wait=0.5
+ )
+ assert result, "RPKI cache connection did not establish after start"
+
+
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))