diff options
| -rw-r--r-- | lib/darr.h | 37 | ||||
| -rw-r--r-- | lib/mgmt_msg_native.c | 45 | ||||
| -rw-r--r-- | lib/mgmt_msg_native.h | 105 | ||||
| -rw-r--r-- | lib/yang.c | 2 | ||||
| -rw-r--r-- | mgmtd/mgmt_fe_adapter.c | 221 | ||||
| -rw-r--r-- | tests/lib/test_darr.c | 27 | ||||
| -rwxr-xr-x | tests/topotests/lib/fe_client.py | 110 | ||||
| -rw-r--r-- | tests/topotests/mgmt_notif/test_notif.py | 6 | 
8 files changed, 506 insertions, 47 deletions
diff --git a/lib/darr.h b/lib/darr.h index 404869d9a2..2b9a0a0c02 100644 --- a/lib/darr.h +++ b/lib/darr.h @@ -24,6 +24,8 @@   *  - darr_ensure_i   *  - darr_ensure_i_mt   *  - darr_free + *  - darr_free_free + *  - darr_free_func   *  - darr_insert   *  - darr_insert_mt   *  - darr_insertz @@ -218,6 +220,41 @@ void *__darr_resize(void *a, uint count, size_t esize, struct memtype *mt);  	} while (0)  /** + * Free memory allocated for the dynamic array `A`, calling `darr_free` for + * each element of the array first. + * + * Args: + *	A: The dynamic array, can be NULL. + */ +#define darr_free_free(A)                                                      \ +	do {                                                                   \ +		for (uint __i = 0; __i < darr_len(A); __i++)                   \ +			if ((A)[__i]) {                                        \ +				struct darr_metadata *__meta =                 \ +					_darr_meta((A)[__i]);                  \ +				XFREE(__meta->mtype, __meta);                  \ +			}                                                      \ +		darr_free(A);                                                  \ +	} while (0) + +/** + * Free memory allocated for the dynamic array `A`, calling `F` routine + * for each element of the array first. + * + * Args: + *	A: The dynamic array, can be NULL. + *	F: The function to call for each element. + */ + +#define darr_free_func(A, F)                                                   \ +	do {                                                                   \ +		for (uint __i = 0; __i < darr_len(A); __i++) {                 \ +			F((A)[__i]);                                           \ +		}                                                              \ +		darr_free(A);                                                  \ +	} while (0) + +/**   * Make sure that there is room in the dynamic array `A` to add `C` elements.   *   * Available space is `darr_cap(a) - darr_len(a)`. diff --git a/lib/mgmt_msg_native.c b/lib/mgmt_msg_native.c index 39ce9abae6..b85c7d1b61 100644 --- a/lib/mgmt_msg_native.c +++ b/lib/mgmt_msg_native.c @@ -6,6 +6,7 @@   *   */  #include <zebra.h> +#include "darr.h"  #include "mgmt_msg_native.h"  DEFINE_MGROUP(MSG_NATIVE, "Native message allocations"); @@ -18,6 +19,33 @@ DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_EDIT, "native edit msg");  DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_EDIT_REPLY, "native edit reply msg");  DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_RPC, "native RPC msg");  DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_RPC_REPLY, "native RPC reply msg"); +DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_SESSION_REQ, "native session-req msg"); +DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_SESSION_REPLY, "native session-reply msg"); + + +size_t mgmt_msg_min_sizes[] = { +	[MGMT_MSG_CODE_ERROR] = sizeof(struct mgmt_msg_error), +	[MGMT_MSG_CODE_GET_TREE] = sizeof(struct mgmt_msg_get_tree), +	[MGMT_MSG_CODE_TREE_DATA] = sizeof(struct mgmt_msg_tree_data), +	[MGMT_MSG_CODE_GET_DATA] = sizeof(struct mgmt_msg_get_data), +	[MGMT_MSG_CODE_NOTIFY] = sizeof(struct mgmt_msg_notify_data), +	[MGMT_MSG_CODE_EDIT] = sizeof(struct mgmt_msg_edit), +	[MGMT_MSG_CODE_EDIT_REPLY] = sizeof(struct mgmt_msg_edit_reply), +	[MGMT_MSG_CODE_RPC] = sizeof(struct mgmt_msg_rpc), +	[MGMT_MSG_CODE_RPC_REPLY] = sizeof(struct mgmt_msg_rpc_reply), +	[MGMT_MSG_CODE_NOTIFY_SELECT] = sizeof(struct mgmt_msg_notify_select), +	[MGMT_MSG_CODE_SESSION_REQ] = sizeof(struct mgmt_msg_session_req), +	[MGMT_MSG_CODE_SESSION_REPLY] = sizeof(struct mgmt_msg_session_reply), +}; +size_t nmgmt_msg_min_sizes = sizeof(mgmt_msg_min_sizes) / +			     sizeof(*mgmt_msg_min_sizes); + +size_t mgmt_msg_get_min_size(uint code) +{ +	if (code >= nmgmt_msg_min_sizes) +		return 0; +	return mgmt_msg_min_sizes[code]; +}  int vmgmt_msg_native_send_error(struct msg_conn *conn, uint64_t sess_or_txn_id,  				uint64_t req_id, bool short_circuit_ok, @@ -50,3 +78,20 @@ int vmgmt_msg_native_send_error(struct msg_conn *conn, uint64_t sess_or_txn_id,  	mgmt_msg_native_free_msg(msg);  	return ret;  } + +const char **_mgmt_msg_native_strings_decode(const void *_sdata, int sdlen) +{ +	const char *sdata = _sdata; +	const char **strings = NULL; +	int len; + +	if (sdata[sdlen - 1] != 0) +		return NULL; + +	for (; sdlen; sdata += len, sdlen -= len) { +		*darr_append(strings) = darr_strdup(sdata); +		len = 1 + darr_strlen(strings[darr_lasti(strings)]); +	} + +	return strings; +} diff --git a/lib/mgmt_msg_native.h b/lib/mgmt_msg_native.h index 21f702cc61..76a52658cd 100644 --- a/lib/mgmt_msg_native.h +++ b/lib/mgmt_msg_native.h @@ -163,6 +163,8 @@ DECLARE_MTYPE(MSG_NATIVE_EDIT);  DECLARE_MTYPE(MSG_NATIVE_EDIT_REPLY);  DECLARE_MTYPE(MSG_NATIVE_RPC);  DECLARE_MTYPE(MSG_NATIVE_RPC_REPLY); +DECLARE_MTYPE(MSG_NATIVE_SESSION_REQ); +DECLARE_MTYPE(MSG_NATIVE_SESSION_REPLY);  /*   * Native message codes @@ -176,6 +178,9 @@ DECLARE_MTYPE(MSG_NATIVE_RPC_REPLY);  #define MGMT_MSG_CODE_EDIT_REPLY 6 /* Public API */  #define MGMT_MSG_CODE_RPC	 7 /* Public API */  #define MGMT_MSG_CODE_RPC_REPLY	 8 /* Public API */ +#define MGMT_MSG_CODE_NOTIFY_SELECT 9 /* Public API */ +#define MGMT_MSG_CODE_SESSION_REQ   10 /* Public API */ +#define MGMT_MSG_CODE_SESSION_REPLY 11 /* Public API */  /*   * Datastores @@ -426,12 +431,72 @@ _Static_assert(sizeof(struct mgmt_msg_rpc_reply) ==  		       offsetof(struct mgmt_msg_rpc_reply, data),  	       "Size mismatch"); +/** + * struct mgmt_msg_notify_select - Add notification selectors for FE client. + * + * Add xpath prefix notification selectors to limit the notifications sent + * to the front-end client. + * + * @selectors: the xpath prefixes to selectors notifications through. + * @replace: if true replace existing selectors with `selectors`. + */ +struct mgmt_msg_notify_select { +	struct mgmt_msg_header; +	uint8_t replace; +	uint8_t resv2[7]; + +	alignas(8) char selectors[]; +}; + +_Static_assert(sizeof(struct mgmt_msg_notify_select) == +		       offsetof(struct mgmt_msg_notify_select, selectors), +	       "Size mismatch"); + +/** + * struct mgmt_msg_session_req - Create or delete a front-end session. + * + * @refer_id: Zero for create, otherwise the session-id to delete. + * @req_id: For create will use as client-id. + * @client_name: For first session request the client name, otherwise empty. + */ +struct mgmt_msg_session_req { +	struct mgmt_msg_header; +	uint8_t resv2[8]; /* bug in compiler produces error w/o this */ + +	alignas(8) char client_name[]; +}; + +_Static_assert(sizeof(struct mgmt_msg_session_req) == +		       offsetof(struct mgmt_msg_session_req, client_name), +	       "Size mismatch"); + +/** + * struct mgmt_msg_session_reply - Reply to session request message. + * + * @created: true if this is a reply to a create request, otherwise 0. + * @refer_id: The session-id for the action (create or delete) just taken. + */ +struct mgmt_msg_session_reply { +	struct mgmt_msg_header; +	uint8_t created; +	uint8_t resv2[7]; +}; +  /*   * Validate that the message ends in a NUL terminating byte   */  #define MGMT_MSG_VALIDATE_NUL_TERM(msgp, len)                                  \  	((len) >= sizeof(*msgp) + 1 && ((char *)msgp)[(len)-1] == 0) +/** + * mgmt_msg_get_min_size() - Get minimum message size given the type + * @code: The type of the message (MGMT_MSG_CODE_*) + * + * Return: + *	The minimum size of a message of the given type or 0 if the message + *	code is unknown. + */ +size_t mgmt_msg_get_min_size(uint code);  /**   * Send a native message error to the other end of the connection. @@ -525,6 +590,25 @@ extern int vmgmt_msg_native_send_error(struct msg_conn *conn,  	})  /** + * mgmt_msg_native_add_str() - Append [another] string to the msg. + * @msg: (IN/OUT) Pointer to the native message, variable may be updated. + * @s: string to append. + * + * Append string @s to the native message @msg. @msg is assumed to have a + * sequence of NUL-terminated strings at the end of it. This function appends + * the string @s and it's NUL terminating octet to the message. + * + * NOTE: Be aware @msg pointer may change as a result of reallocating the + * message to fit the new data. Any other pointers into the old message should + * be discarded. + */ +#define mgmt_msg_native_add_str(msg, s)                                        \ +	do {                                                                   \ +		int __len = strlen(s) + 1;                                     \ +		mgmt_msg_native_append(msg, s, __len);                         \ +	} while (0) + +/**   * mgmt_msg_native_send_msg(msg, short_circuit_ok) - Send a native msg.   * @conn: the mgmt_msg connection.   * @msg: the native message. @@ -689,6 +773,27 @@ extern int vmgmt_msg_native_send_error(struct msg_conn *conn,  #define mgmt_msg_native_data_len_decode(msg, msglen)                           \  	((msglen) - sizeof(*msg) - msg->vsplit) +/** + * mgmt_msg_native_strings_decode() - Get dynamic array of str ptrs from the msg. + * @msg: Pointer to the native message. + * @msglen: Length of the message. + * @sdata: pointer to the variable length string data at end of @msg. + * + * Given a pointer to a sequence of NUL-terminated strings allocate + * and return a dynamic array of dynamic array strings. This function + * can be used to decode a message that was built using + * mgmt_msg_native_add_str(). + * + * Return: a dynamic array (darr) of string pointers, or NULL if the message + * is corrupt. + */ +#define mgmt_msg_native_strings_decode(msg, msg_len, sdata)                    \ +	_mgmt_msg_native_strings_decode(sdata,                                 \ +					(msg_len) - ((sdata) - (char *)(msg))) + +extern const char **_mgmt_msg_native_strings_decode(const void *sdata, +						    int sdlen); +  #ifdef __cplusplus  }  #endif diff --git a/lib/yang.c b/lib/yang.c index 702fcf436d..44459df4a5 100644 --- a/lib/yang.c +++ b/lib/yang.c @@ -897,7 +897,7 @@ char *yang_convert_lyd_format(const char *data, size_t data_len,  	assert(out_format != LYD_LYB); -	if (in_format != LYD_LYB && !MGMT_MSG_VALIDATE_NUL_TERM(data, data_len)) { +	if (in_format != LYD_LYB && (!data_len || data[data_len - 1] != 0)) {  		zlog_err("Corrupt input data, no NUL terminating byte");  		return NULL;  	} diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c index fc1bde0b38..5f53c928a4 100644 --- a/mgmtd/mgmt_fe_adapter.c +++ b/mgmtd/mgmt_fe_adapter.c @@ -43,6 +43,7 @@ struct mgmt_fe_session_ctx {  	uint64_t txn_id;  	uint64_t cfg_txn_id;  	uint8_t ds_locked[MGMTD_DS_MAX_ID]; +	const char **notify_xpaths;  	struct event *proc_cfg_txn_clnp;  	struct event *proc_show_txn_clnp; @@ -489,6 +490,26 @@ static int fe_adapter_send_get_reply(struct mgmt_fe_session_ctx *session,  	return fe_adapter_send_msg(session->adapter, &fe_msg, false);  } +static int fe_adapter_conn_send_error(struct msg_conn *conn, +				      uint64_t session_id, uint64_t req_id, +				      bool short_circuit_ok, int16_t error, +				      const char *errfmt, ...) PRINTFRR(6, 7); +static int fe_adapter_conn_send_error(struct msg_conn *conn, uint64_t session_id, +				      uint64_t req_id, bool short_circuit_ok, +				      int16_t error, const char *errfmt, ...) +{ +	va_list ap; +	int ret; + +	va_start(ap, errfmt); + +	ret = vmgmt_msg_native_send_error(conn, session_id, req_id, +					  short_circuit_ok, error, errfmt, ap); +	va_end(ap); + +	return ret; +} +  static int fe_adapter_send_error(struct mgmt_fe_session_ctx *session,  				 uint64_t req_id, bool short_circuit_ok,  				 int16_t error, const char *errfmt, ...) @@ -1169,6 +1190,88 @@ static int fe_adapter_send_edit_reply(struct mgmt_fe_session_ctx *session,  	return ret;  } +static int +fe_adapter_native_send_session_reply(struct mgmt_fe_client_adapter *adapter, +				     uint64_t req_id, uint64_t session_id, +				     bool created) +{ +	struct mgmt_msg_session_reply *msg; +	int ret; + +	msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_session_reply, 0, +					MTYPE_MSG_NATIVE_SESSION_REPLY); +	msg->refer_id = session_id; +	msg->req_id = req_id; +	msg->code = MGMT_MSG_CODE_SESSION_REPLY; +	msg->created = created; + +	__dbg("Sending session-reply from adapter %s to session-id %" PRIu64 +	      " req-id %" PRIu64 " len %u", +	      adapter->name, session_id, req_id, +	      mgmt_msg_native_get_msg_len(msg)); + +	ret = fe_adapter_send_native_msg(adapter, msg, +					 mgmt_msg_native_get_msg_len(msg), +					 false); +	mgmt_msg_native_free_msg(msg); + +	return ret; +} + +/** + * fe_adapter_handle_session_req() - Handle a session-req message from a FE client. + * @msg_raw: the message data. + * @msg_len: the length of the message data. + */ +static void fe_adapter_handle_session_req(struct mgmt_fe_client_adapter *adapter, +					  void *__msg, size_t msg_len) +{ +	struct mgmt_msg_session_req *msg = __msg; +	struct mgmt_fe_session_ctx *session; +	uint64_t client_id; + +	__dbg("Got session-req creating: %u for refer-id %" PRIu64 " from '%s'", +	      msg->refer_id == 0, msg->refer_id, adapter->name); + +	if (msg->refer_id) { +		uint64_t session_id = msg->refer_id; + +		session = mgmt_session_id2ctx(session_id); +		if (!session) { +			fe_adapter_conn_send_error( +				adapter->conn, session_id, msg->req_id, false, +				-EINVAL, +				"No session to delete for session-id: %" PRIu64, +				session_id); +			return; +		} +		fe_adapter_native_send_session_reply(adapter, msg->req_id, +						     session_id, false); +		mgmt_fe_cleanup_session(&session); +		return; +	} + +	client_id = msg->req_id; + +	/* See if we have a client name to register */ +	if (msg_len > sizeof(*msg)) { +		if (!MGMT_MSG_VALIDATE_NUL_TERM(msg, msg_len)) { +			fe_adapter_conn_send_error( +				adapter->conn, client_id, msg->req_id, false, +				-EINVAL, +				"Corrupt session-req message rcvd from client-id: %" PRIu64, +				client_id); +			return; +		} +		__dbg("Set client-name to '%s'", msg->client_name); +		strlcpy(adapter->name, msg->client_name, sizeof(adapter->name)); +	} + +	session = mgmt_fe_create_session(adapter, client_id); +	fe_adapter_native_send_session_reply(adapter, client_id, +					     session->session_id, true); +} +  /**   * fe_adapter_handle_get_data() - Handle a get-tree message from a FE client.   * @session: the client session. @@ -1402,9 +1505,44 @@ static void fe_adapter_handle_edit(struct mgmt_fe_session_ctx *session,  }  /** + * fe_adapter_handle_notify_select() - Handle an Notify Select message. + * @session: the client session. + * @__msg: the message data. + * @msg_len: the length of the message data. + */ +static void fe_adapter_handle_notify_select(struct mgmt_fe_session_ctx *session, +					    void *__msg, size_t msg_len) +{ +	struct mgmt_msg_notify_select *msg = __msg; +	uint64_t req_id = msg->req_id; +	const char **selectors = NULL; +	const char **new; + +	/* An empty message clears the selectors */ +	if (msg_len >= sizeof(*msg)) { +		selectors = mgmt_msg_native_strings_decode(msg, msg_len, +							   msg->selectors); +		if (!selectors) { +			fe_adapter_send_error(session, req_id, false, -EINVAL, +					      "Invalid message"); +			return; +		} +	} +	if (msg->replace) { +		darr_free_free(session->notify_xpaths); +		session->notify_xpaths = selectors; +	} else { +		new = darr_append_nz(session->notify_xpaths, +				     darr_len(selectors)); +		memcpy(new, selectors, darr_len(selectors) * sizeof(*selectors)); +		darr_free(selectors); +	} +} + +/**   * fe_adapter_handle_rpc() - Handle an RPC message from an FE client.   * @session: the client session. - * @msg_raw: the message data. + * @__msg: the message data.   * @msg_len: the length of the message data.   */  static void fe_adapter_handle_rpc(struct mgmt_fe_session_ctx *session, @@ -1493,6 +1631,28 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter,  					 size_t msg_len)  {  	struct mgmt_fe_session_ctx *session; +	size_t min_size = mgmt_msg_get_min_size(msg->code); + +	if (msg_len < min_size) { +		if (!min_size) +			__log_err("adapter %s: recv msg refer-id %" PRIu64 +				  " unknown message type %u", +				  adapter->name, msg->refer_id, msg->code); +		else +			__log_err("adapter %s: recv msg refer-id %" PRIu64 +				  " short (%zu<%zu) msg for type %u", +				  adapter->name, msg->refer_id, msg_len, +				  min_size, msg->code); +		return; +	} + +	if (msg->code == MGMT_MSG_CODE_SESSION_REQ) { +		__dbg("adapter %s: session-id %" PRIu64 +		      " received SESSION_REQ message", +		      adapter->name, msg->refer_id); +		fe_adapter_handle_session_req(adapter, msg, msg_len); +		return; +	}  	session = mgmt_session_id2ctx(msg->refer_id);  	if (!session) { @@ -1503,13 +1663,26 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter,  	assert(session->adapter == adapter);  	switch (msg->code) { -	case MGMT_MSG_CODE_GET_DATA: -		fe_adapter_handle_get_data(session, msg, msg_len); -		break;  	case MGMT_MSG_CODE_EDIT: +		__dbg("adapter %s: session-id %" PRIu64 " received EDIT message", +		      adapter->name, msg->refer_id);  		fe_adapter_handle_edit(session, msg, msg_len);  		break; +	case MGMT_MSG_CODE_NOTIFY_SELECT: +		__dbg("adapter %s: session-id %" PRIu64 +		      " received NOTIFY_SELECT message", +		      adapter->name, msg->refer_id); +		fe_adapter_handle_notify_select(session, msg, msg_len); +		break; +	case MGMT_MSG_CODE_GET_DATA: +		__dbg("adapter %s: session-id %" PRIu64 +		      " received GET_DATA message", +		      adapter->name, msg->refer_id); +		fe_adapter_handle_get_data(session, msg, msg_len); +		break;  	case MGMT_MSG_CODE_RPC: +		__dbg("adapter %s: session-id %" PRIu64 " received RPC message", +		      adapter->name, msg->refer_id);  		fe_adapter_handle_rpc(session, msg, msg_len);  		break;  	default: @@ -1554,14 +1727,48 @@ void mgmt_fe_adapter_send_notify(struct mgmt_msg_notify_data *msg, size_t msglen  {  	struct mgmt_fe_client_adapter *adapter;  	struct mgmt_fe_session_ctx *session; +	struct nb_node *nb_node; +	const char **xpath_prefix; +	const char *notif; +	bool sendit; +	uint len;  	assert(msg->refer_id == 0); +	notif = mgmt_msg_native_xpath_decode(msg, msglen); +	if (!notif) { +		__log_err("Corrupt notify msg"); +		return; +	} + +	/* +	 * We need the nb_node to obtain a path which does not include any +	 * specific list entry selectors +	 */ +	nb_node = nb_node_find(notif); +	if (!nb_node) { +		__log_err("No schema found for notification: %s", notif); +		return; +	} +  	FOREACH_ADAPTER_IN_LIST (adapter) {  		FOREACH_SESSION_IN_LIST (adapter, session) { -			msg->refer_id = session->session_id; -			(void)fe_adapter_send_native_msg(adapter, msg, msglen, -							 false); +			/* If no selectors then always send */ +			sendit = !session->notify_xpaths; +			darr_foreach_p (session->notify_xpaths, xpath_prefix) { +				len = strlen(*xpath_prefix); +				if (!strncmp(*xpath_prefix, notif, len) || +				    !strncmp(*xpath_prefix, nb_node->xpath, +					     len)) { +					sendit = true; +					break; +				} +			} +			if (sendit) { +				msg->refer_id = session->session_id; +				(void)fe_adapter_send_native_msg(adapter, msg, +								 msglen, false); +			}  		}  	}  	msg->refer_id = 0; diff --git a/tests/lib/test_darr.c b/tests/lib/test_darr.c index 74aedac4b7..87f9e3e564 100644 --- a/tests/lib/test_darr.c +++ b/tests/lib/test_darr.c @@ -20,6 +20,8 @@   * [x] - darr_foreach_i   * [x] - darr_foreach_p   * [x] - darr_free + * [x] - darr_free_free + * [x] - darr_free_func   * [x] - darr_insert   * [ ] - darr_insertz   * [x] - darr_insert_n @@ -318,6 +320,8 @@ static void test_string(void)  	uint addlen = strlen(add);  	char *da1 = NULL;  	char *da2 = NULL; +	const char **strings = NULL; +	uint sum = 0;  	assert(darr_strlen(da1) == 0); @@ -412,6 +416,29 @@ static void test_string(void)  	da1 = darr_in_strcatf(da1, "0123456789: %08x", 0xDEADBEEF);  	assert(!strcmp("0123456789: deadbeef", da1));  	darr_free(da1); + +	sum = 0; +	*darr_append(strings) = "1"; +	*darr_append(strings) = "2"; +	*darr_append(strings) = "3"; +#define adder(x) (sum += atoi(x)) +	darr_free_func(strings, adder); +	assert(sum == 6); +	assert(strings == NULL); + +	sum = 0; +	darr_free_func(strings, adder); +	assert(sum == 0); +	assert(strings == NULL); + +	*darr_append(strings) = NULL; +	*darr_append(strings) = darr_strdup("2"); +	*darr_append(strings) = darr_strdup("3"); +	darr_free_free(strings); +	assert(strings == NULL); + +	darr_free_free(strings); +	assert(strings == NULL);  }  int main(int argc, char **argv) diff --git a/tests/topotests/lib/fe_client.py b/tests/topotests/lib/fe_client.py index a47544633b..d61bc850b4 100755 --- a/tests/topotests/lib/fe_client.py +++ b/tests/topotests/lib/fe_client.py @@ -18,6 +18,8 @@ import sys  import time  from pathlib import Path +from munet.base import Timeout +  CWD = os.path.dirname(os.path.realpath(__file__))  # This is painful but works if you have installed protobuf would be better if we @@ -80,6 +82,13 @@ GET_DATA_FLAG_EXACT = 0x4  MSG_NOTIFY_FMT = "=B7x"  NOTIFY_FIELD_RESULT_TYPE = 0 +MSG_NOTIFY_SELECT_FMT = "=B7x" + +MSG_SESSION_REQ_FMT = "=8x" + +MSG_SESSION_REPLY_FMT = "=B7x" +SESSION_REPLY_FIELD_CREATED = 0 +  #  # Native message codes  # @@ -88,6 +97,9 @@ MSG_CODE_ERROR = 0  MSG_CODE_TREE_DATA = 2  MSG_CODE_GET_DATA = 3  MSG_CODE_NOTIFY = 4 +MSG_CODE_NOTIFY_SELECT = 9 +MSG_CODE_SESSION_REQ = 10 +MSG_CODE_SESSION_REPLY = 11  msg_native_formats = {      MSG_CODE_ERROR: MSG_ERROR_FMT, @@ -95,6 +107,9 @@ msg_native_formats = {      MSG_CODE_TREE_DATA: MSG_TREE_DATA_FMT,      MSG_CODE_GET_DATA: MSG_GET_DATA_FMT,      MSG_CODE_NOTIFY: MSG_NOTIFY_FMT, +    MSG_CODE_NOTIFY_SELECT: MSG_NOTIFY_SELECT_FMT, +    MSG_CODE_SESSION_REQ: MSG_SESSION_REQ_FMT, +    MSG_CODE_SESSION_REPLY: MSG_SESSION_REPLY_FMT,  } @@ -177,27 +192,44 @@ class Session:      client_id = 1 -    def __init__(self, sock): +    def __init__(self, sock, use_protobuf):          self.sock = sock          self.next_req_id = 1 -        req = mgmt_pb2.FeMessage() -        req.register_req.client_name = "test-client" -        self.send_pb_msg(req) -        logging.debug("Sent FeRegisterReq: %s", req) +        if use_protobuf: +            req = mgmt_pb2.FeMessage() +            req.register_req.client_name = "test-client" +            self.send_pb_msg(req) +            logging.debug("Sent FeRegisterReq: %s", req) -        req = mgmt_pb2.FeMessage() -        req.session_req.create = 1 -        req.session_req.client_conn_id = Session.client_id -        Session.client_id += 1 -        self.send_pb_msg(req) -        logging.debug("Sent FeSessionReq: %s", req) +            req = mgmt_pb2.FeMessage() +            req.session_req.create = 1 +            req.session_req.client_conn_id = Session.client_id +            Session.client_id += 1 +            self.send_pb_msg(req) +            logging.debug("Sent FeSessionReq: %s", req) -        reply = self.recv_pb_msg(mgmt_pb2.FeMessage()) -        logging.debug("Received FeSessionReply: %s", repr(reply)) +            reply = self.recv_pb_msg(mgmt_pb2.FeMessage()) +            logging.debug("Received FeSessionReply: %s", repr(reply)) -        assert reply.session_reply.success -        self.sess_id = reply.session_reply.session_id +            assert reply.session_reply.success +            self.sess_id = reply.session_reply.session_id +        else: +            self.sess_id = 0 +            mdata, req_id = self.get_native_msg_header(MSG_CODE_SESSION_REQ) +            mdata += struct.pack(MSG_SESSION_REQ_FMT) +            mdata += "test-client".encode("utf-8") + b"\x00" + +            self.send_native_msg(mdata) +            logging.debug("Sent native SESSION-REQ") + +            mhdr, mfixed, mdata = self.recv_native_msg() +            if mhdr[HDR_FIELD_CODE] == MSG_CODE_SESSION_REPLY: +                logging.debug("Recv native SESSION-REQ Message: %s: %s", mfixed, mdata) +            else: +                raise Exception(f"Recv NON-SESSION-REPLY Message: {mfixed}: {mdata}") +            assert mfixed[0] +            self.sess_id = mhdr[HDR_FIELD_SESS_ID]      def close(self, clean=True):          if clean: @@ -308,17 +340,22 @@ class Session:          logging.debug("Received GET: %s: %s", mfixed, mdata)          return result -    # def subscribe(self, notif_xpath): -    #     # Create the message -    #     mdata, req_id = self.get_native_msg_header(MSG_CODE_SUBSCRIBE) -    #     mdata += struct.pack(MSG_SUBSCRIBE_FMT, MSG_FORMAT_JSON) -    #     mdata += notif_xpath.encode("utf-8") + b"\x00" +    def add_notify_select(self, replace, notif_xpaths): +        # Create the message +        mdata, req_id = self.get_native_msg_header(MSG_CODE_NOTIFY_SELECT) +        mdata += struct.pack(MSG_NOTIFY_SELECT_FMT, replace) + +        for xpath in notif_xpaths: +            mdata += xpath.encode("utf-8") + b"\x00" -    #     self.send_native_msg(mdata) -    #     logging.debug("Sent SUBSCRIBE") +        self.send_native_msg(mdata) +        logging.debug("Sent NOTIFY_SELECT")      def recv_notify(self, xpaths=None): -        while True: +        if xpaths: +            self.add_notify_select(True, xpaths) + +        for remaining in Timeout(60):              logging.debug("Waiting for Notify Message")              mhdr, mfixed, mdata = self.recv_native_msg()              if mhdr[HDR_FIELD_CODE] == MSG_CODE_NOTIFY: @@ -328,19 +365,11 @@ class Session:              vsplit = mhdr[HDR_FIELD_VSPLIT]              assert mdata[vsplit - 1] == 0 -            xpath = mdata[: vsplit - 1].decode("utf-8") -              assert mdata[-1] == 0 -            result = mdata[vsplit:-1].decode("utf-8") - -            if not xpaths: -                return result -            js = json.loads(result) -            key = [x for x in js.keys()][0] -            for xpath in xpaths: -                if key.startswith(xpath): -                    return result -            logging.debug("'%s' didn't match xpath filters", key) +            # xpath = mdata[: vsplit - 1].decode("utf-8") +            return mdata[vsplit:-1].decode("utf-8") +        else: +            raise TimeoutError("Timeout waiting for notifications")  def __parse_args(): @@ -365,6 +394,9 @@ def __parse_args():          "-q", "--query", nargs="+", metavar="XPATH", help="xpath[s] to query"      )      parser.add_argument("-s", "--server", default=MPATH, help="path to server socket") +    parser.add_argument( +        "--use-protobuf", action="store_true", help="Use protobuf when there's a choice" +    )      parser.add_argument("-v", "--verbose", action="store_true", help="Be verbose")      args = parser.parse_args() @@ -381,13 +413,15 @@ def __server_connect(spath):          logging.warn("retry server connection in .5s (%s)", os.strerror(ec))          time.sleep(0.5)      logging.info("Connected to server on %s", spath) +    # Set a timeout of 5 minutes for socket operations. +    sock.settimeout(60 * 5)      return sock  def __main():      args = __parse_args()      sock = __server_connect(Path(args.server)) -    sess = Session(sock) +    sess = Session(sock, use_protobuf=args.use_protobuf)      if args.query:          # Performa an xpath query @@ -412,8 +446,12 @@ def main():          __main()      except KeyboardInterrupt:          logging.info("Exiting") +    except TimeoutError as error: +        logging.error("Timeout: %s", error) +        sys.exit(2)      except Exception as error:          logging.error("Unexpected error exiting: %s", error, exc_info=True) +        sys.exit(1)  if __name__ == "__main__": diff --git a/tests/topotests/mgmt_notif/test_notif.py b/tests/topotests/mgmt_notif/test_notif.py index c85e7ba795..01466892a8 100644 --- a/tests/topotests/mgmt_notif/test_notif.py +++ b/tests/topotests/mgmt_notif/test_notif.py @@ -51,7 +51,7 @@ def test_frontend_notification(tgen):      check_kernel_32(r1, "11.11.11.11", 1, "") -    fe_client_path = CWD + "/../lib/fe_client.py" +    fe_client_path = CWD + "/../lib/fe_client.py --verbose"      rc, _, _ = r1.cmd_status(fe_client_path + " --help")      if rc: @@ -61,7 +61,7 @@ def test_frontend_notification(tgen):      # So we filter to avoid that, all the rest are frr-ripd:authentication-failure      # making our test deterministic      output = r1.cmd_raises( -        fe_client_path + " --listen  frr-ripd:authentication-failure" +        fe_client_path + " --listen /frr-ripd:authentication-failure"      )      jsout = json.loads(output) @@ -69,7 +69,7 @@ def test_frontend_notification(tgen):      result = json_cmp(jsout, expected)      assert result is None -    output = r1.cmd_raises(fe_client_path + " --listen") +    output = r1.cmd_raises(fe_client_path + " --use-protobuf --listen")      jsout = json.loads(output)      expected = {"frr-ripd:authentication-failure": {"interface-name": "r1-eth0"}}  | 
