import threading
import pytest
+from lib.common_config import retry
from lib.topogen import Topogen
from lib.topotest import json_cmp
tgen.stop_topology()
+# Verify the backend test client has connected
+@retry(retry_timeout=10)
+def check_client_connect(r1):
+ out = r1.vtysh_cmd("show mgmt backend-adapter all")
+ assert "mgmtd-testc" in out
+
+
def test_backend_rpc(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"]
+ # Run the backend test client which registers to handle the `clear ip rip` command.
be_client_path = "/usr/lib/frr/mgmtd_testc"
rc, _, _ = r1.net.cmd_status(be_client_path + " --help")
t = threading.Thread(target=run_testc)
t.start()
+ # We need to wait for mgmtd_testc to connect before issuing the command.
+ res = check_client_connect(r1)
+ assert res is None
+
r1.vtysh_cmd("clear ip rip vrf testname")
t.join()