summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/fe_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/fe_client.py')
-rwxr-xr-xtests/topotests/lib/fe_client.py110
1 files changed, 74 insertions, 36 deletions
diff --git a/tests/topotests/lib/fe_client.py b/tests/topotests/lib/fe_client.py
index a47544633b..d61bc850b4 100755
--- a/tests/topotests/lib/fe_client.py
+++ b/tests/topotests/lib/fe_client.py
@@ -18,6 +18,8 @@ import sys
import time
from pathlib import Path
+from munet.base import Timeout
+
CWD = os.path.dirname(os.path.realpath(__file__))
# This is painful but works if you have installed protobuf would be better if we
@@ -80,6 +82,13 @@ GET_DATA_FLAG_EXACT = 0x4
MSG_NOTIFY_FMT = "=B7x"
NOTIFY_FIELD_RESULT_TYPE = 0
+MSG_NOTIFY_SELECT_FMT = "=B7x"
+
+MSG_SESSION_REQ_FMT = "=8x"
+
+MSG_SESSION_REPLY_FMT = "=B7x"
+SESSION_REPLY_FIELD_CREATED = 0
+
#
# Native message codes
#
@@ -88,6 +97,9 @@ MSG_CODE_ERROR = 0
MSG_CODE_TREE_DATA = 2
MSG_CODE_GET_DATA = 3
MSG_CODE_NOTIFY = 4
+MSG_CODE_NOTIFY_SELECT = 9
+MSG_CODE_SESSION_REQ = 10
+MSG_CODE_SESSION_REPLY = 11
msg_native_formats = {
MSG_CODE_ERROR: MSG_ERROR_FMT,
@@ -95,6 +107,9 @@ msg_native_formats = {
MSG_CODE_TREE_DATA: MSG_TREE_DATA_FMT,
MSG_CODE_GET_DATA: MSG_GET_DATA_FMT,
MSG_CODE_NOTIFY: MSG_NOTIFY_FMT,
+ MSG_CODE_NOTIFY_SELECT: MSG_NOTIFY_SELECT_FMT,
+ MSG_CODE_SESSION_REQ: MSG_SESSION_REQ_FMT,
+ MSG_CODE_SESSION_REPLY: MSG_SESSION_REPLY_FMT,
}
@@ -177,27 +192,44 @@ class Session:
client_id = 1
- def __init__(self, sock):
+ def __init__(self, sock, use_protobuf):
self.sock = sock
self.next_req_id = 1
- req = mgmt_pb2.FeMessage()
- req.register_req.client_name = "test-client"
- self.send_pb_msg(req)
- logging.debug("Sent FeRegisterReq: %s", req)
+ if use_protobuf:
+ req = mgmt_pb2.FeMessage()
+ req.register_req.client_name = "test-client"
+ self.send_pb_msg(req)
+ logging.debug("Sent FeRegisterReq: %s", req)
- req = mgmt_pb2.FeMessage()
- req.session_req.create = 1
- req.session_req.client_conn_id = Session.client_id
- Session.client_id += 1
- self.send_pb_msg(req)
- logging.debug("Sent FeSessionReq: %s", req)
+ req = mgmt_pb2.FeMessage()
+ req.session_req.create = 1
+ req.session_req.client_conn_id = Session.client_id
+ Session.client_id += 1
+ self.send_pb_msg(req)
+ logging.debug("Sent FeSessionReq: %s", req)
- reply = self.recv_pb_msg(mgmt_pb2.FeMessage())
- logging.debug("Received FeSessionReply: %s", repr(reply))
+ reply = self.recv_pb_msg(mgmt_pb2.FeMessage())
+ logging.debug("Received FeSessionReply: %s", repr(reply))
- assert reply.session_reply.success
- self.sess_id = reply.session_reply.session_id
+ assert reply.session_reply.success
+ self.sess_id = reply.session_reply.session_id
+ else:
+ self.sess_id = 0
+ mdata, req_id = self.get_native_msg_header(MSG_CODE_SESSION_REQ)
+ mdata += struct.pack(MSG_SESSION_REQ_FMT)
+ mdata += "test-client".encode("utf-8") + b"\x00"
+
+ self.send_native_msg(mdata)
+ logging.debug("Sent native SESSION-REQ")
+
+ mhdr, mfixed, mdata = self.recv_native_msg()
+ if mhdr[HDR_FIELD_CODE] == MSG_CODE_SESSION_REPLY:
+ logging.debug("Recv native SESSION-REQ Message: %s: %s", mfixed, mdata)
+ else:
+ raise Exception(f"Recv NON-SESSION-REPLY Message: {mfixed}: {mdata}")
+ assert mfixed[0]
+ self.sess_id = mhdr[HDR_FIELD_SESS_ID]
def close(self, clean=True):
if clean:
@@ -308,17 +340,22 @@ class Session:
logging.debug("Received GET: %s: %s", mfixed, mdata)
return result
- # def subscribe(self, notif_xpath):
- # # Create the message
- # mdata, req_id = self.get_native_msg_header(MSG_CODE_SUBSCRIBE)
- # mdata += struct.pack(MSG_SUBSCRIBE_FMT, MSG_FORMAT_JSON)
- # mdata += notif_xpath.encode("utf-8") + b"\x00"
+ def add_notify_select(self, replace, notif_xpaths):
+ # Create the message
+ mdata, req_id = self.get_native_msg_header(MSG_CODE_NOTIFY_SELECT)
+ mdata += struct.pack(MSG_NOTIFY_SELECT_FMT, replace)
+
+ for xpath in notif_xpaths:
+ mdata += xpath.encode("utf-8") + b"\x00"
- # self.send_native_msg(mdata)
- # logging.debug("Sent SUBSCRIBE")
+ self.send_native_msg(mdata)
+ logging.debug("Sent NOTIFY_SELECT")
def recv_notify(self, xpaths=None):
- while True:
+ if xpaths:
+ self.add_notify_select(True, xpaths)
+
+ for remaining in Timeout(60):
logging.debug("Waiting for Notify Message")
mhdr, mfixed, mdata = self.recv_native_msg()
if mhdr[HDR_FIELD_CODE] == MSG_CODE_NOTIFY:
@@ -328,19 +365,11 @@ class Session:
vsplit = mhdr[HDR_FIELD_VSPLIT]
assert mdata[vsplit - 1] == 0
- xpath = mdata[: vsplit - 1].decode("utf-8")
-
assert mdata[-1] == 0
- result = mdata[vsplit:-1].decode("utf-8")
-
- if not xpaths:
- return result
- js = json.loads(result)
- key = [x for x in js.keys()][0]
- for xpath in xpaths:
- if key.startswith(xpath):
- return result
- logging.debug("'%s' didn't match xpath filters", key)
+ # xpath = mdata[: vsplit - 1].decode("utf-8")
+ return mdata[vsplit:-1].decode("utf-8")
+ else:
+ raise TimeoutError("Timeout waiting for notifications")
def __parse_args():
@@ -365,6 +394,9 @@ def __parse_args():
"-q", "--query", nargs="+", metavar="XPATH", help="xpath[s] to query"
)
parser.add_argument("-s", "--server", default=MPATH, help="path to server socket")
+ parser.add_argument(
+ "--use-protobuf", action="store_true", help="Use protobuf when there's a choice"
+ )
parser.add_argument("-v", "--verbose", action="store_true", help="Be verbose")
args = parser.parse_args()
@@ -381,13 +413,15 @@ def __server_connect(spath):
logging.warn("retry server connection in .5s (%s)", os.strerror(ec))
time.sleep(0.5)
logging.info("Connected to server on %s", spath)
+ # Set a timeout of 5 minutes for socket operations.
+ sock.settimeout(60 * 5)
return sock
def __main():
args = __parse_args()
sock = __server_connect(Path(args.server))
- sess = Session(sock)
+ sess = Session(sock, use_protobuf=args.use_protobuf)
if args.query:
# Performa an xpath query
@@ -412,8 +446,12 @@ def main():
__main()
except KeyboardInterrupt:
logging.info("Exiting")
+ except TimeoutError as error:
+ logging.error("Timeout: %s", error)
+ sys.exit(2)
except Exception as error:
logging.error("Unexpected error exiting: %s", error, exc_info=True)
+ sys.exit(1)
if __name__ == "__main__":