diff options
| author | Jafar Al-Gharaibeh <jafar@atcorp.com> | 2024-06-13 00:20:09 -0500 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-06-13 00:20:09 -0500 | 
| commit | 2e02bd2366ebf877963802d79e66b805ccffbf4c (patch) | |
| tree | 4131dc2e92ef2678f6c1cc1de6e1dddc56d4b1bc /mgmtd | |
| parent | d8e3121cb8470fe9a934100de9170b4ef48b17a6 (diff) | |
| parent | 27e369487eb602b75ea353e8c21333bd83032a86 (diff) | |
Merge pull request #16184 from LabNConsulting/chopps/fe-notify-select
mgmtd: add notification selection to front-end API 
Diffstat (limited to 'mgmtd')
| -rw-r--r-- | mgmtd/mgmt_fe_adapter.c | 221 | 
1 files changed, 214 insertions, 7 deletions
diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c index fc1bde0b38..5f53c928a4 100644 --- a/mgmtd/mgmt_fe_adapter.c +++ b/mgmtd/mgmt_fe_adapter.c @@ -43,6 +43,7 @@ struct mgmt_fe_session_ctx {  	uint64_t txn_id;  	uint64_t cfg_txn_id;  	uint8_t ds_locked[MGMTD_DS_MAX_ID]; +	const char **notify_xpaths;  	struct event *proc_cfg_txn_clnp;  	struct event *proc_show_txn_clnp; @@ -489,6 +490,26 @@ static int fe_adapter_send_get_reply(struct mgmt_fe_session_ctx *session,  	return fe_adapter_send_msg(session->adapter, &fe_msg, false);  } +static int fe_adapter_conn_send_error(struct msg_conn *conn, +				      uint64_t session_id, uint64_t req_id, +				      bool short_circuit_ok, int16_t error, +				      const char *errfmt, ...) PRINTFRR(6, 7); +static int fe_adapter_conn_send_error(struct msg_conn *conn, uint64_t session_id, +				      uint64_t req_id, bool short_circuit_ok, +				      int16_t error, const char *errfmt, ...) +{ +	va_list ap; +	int ret; + +	va_start(ap, errfmt); + +	ret = vmgmt_msg_native_send_error(conn, session_id, req_id, +					  short_circuit_ok, error, errfmt, ap); +	va_end(ap); + +	return ret; +} +  static int fe_adapter_send_error(struct mgmt_fe_session_ctx *session,  				 uint64_t req_id, bool short_circuit_ok,  				 int16_t error, const char *errfmt, ...) @@ -1169,6 +1190,88 @@ static int fe_adapter_send_edit_reply(struct mgmt_fe_session_ctx *session,  	return ret;  } +static int +fe_adapter_native_send_session_reply(struct mgmt_fe_client_adapter *adapter, +				     uint64_t req_id, uint64_t session_id, +				     bool created) +{ +	struct mgmt_msg_session_reply *msg; +	int ret; + +	msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_session_reply, 0, +					MTYPE_MSG_NATIVE_SESSION_REPLY); +	msg->refer_id = session_id; +	msg->req_id = req_id; +	msg->code = MGMT_MSG_CODE_SESSION_REPLY; +	msg->created = created; + +	__dbg("Sending session-reply from adapter %s to session-id %" PRIu64 +	      " req-id %" PRIu64 " len %u", +	      adapter->name, session_id, req_id, +	      mgmt_msg_native_get_msg_len(msg)); + +	ret = fe_adapter_send_native_msg(adapter, msg, +					 mgmt_msg_native_get_msg_len(msg), +					 false); +	mgmt_msg_native_free_msg(msg); + +	return ret; +} + +/** + * fe_adapter_handle_session_req() - Handle a session-req message from a FE client. + * @msg_raw: the message data. + * @msg_len: the length of the message data. + */ +static void fe_adapter_handle_session_req(struct mgmt_fe_client_adapter *adapter, +					  void *__msg, size_t msg_len) +{ +	struct mgmt_msg_session_req *msg = __msg; +	struct mgmt_fe_session_ctx *session; +	uint64_t client_id; + +	__dbg("Got session-req creating: %u for refer-id %" PRIu64 " from '%s'", +	      msg->refer_id == 0, msg->refer_id, adapter->name); + +	if (msg->refer_id) { +		uint64_t session_id = msg->refer_id; + +		session = mgmt_session_id2ctx(session_id); +		if (!session) { +			fe_adapter_conn_send_error( +				adapter->conn, session_id, msg->req_id, false, +				-EINVAL, +				"No session to delete for session-id: %" PRIu64, +				session_id); +			return; +		} +		fe_adapter_native_send_session_reply(adapter, msg->req_id, +						     session_id, false); +		mgmt_fe_cleanup_session(&session); +		return; +	} + +	client_id = msg->req_id; + +	/* See if we have a client name to register */ +	if (msg_len > sizeof(*msg)) { +		if (!MGMT_MSG_VALIDATE_NUL_TERM(msg, msg_len)) { +			fe_adapter_conn_send_error( +				adapter->conn, client_id, msg->req_id, false, +				-EINVAL, +				"Corrupt session-req message rcvd from client-id: %" PRIu64, +				client_id); +			return; +		} +		__dbg("Set client-name to '%s'", msg->client_name); +		strlcpy(adapter->name, msg->client_name, sizeof(adapter->name)); +	} + +	session = mgmt_fe_create_session(adapter, client_id); +	fe_adapter_native_send_session_reply(adapter, client_id, +					     session->session_id, true); +} +  /**   * fe_adapter_handle_get_data() - Handle a get-tree message from a FE client.   * @session: the client session. @@ -1402,9 +1505,44 @@ static void fe_adapter_handle_edit(struct mgmt_fe_session_ctx *session,  }  /** + * fe_adapter_handle_notify_select() - Handle an Notify Select message. + * @session: the client session. + * @__msg: the message data. + * @msg_len: the length of the message data. + */ +static void fe_adapter_handle_notify_select(struct mgmt_fe_session_ctx *session, +					    void *__msg, size_t msg_len) +{ +	struct mgmt_msg_notify_select *msg = __msg; +	uint64_t req_id = msg->req_id; +	const char **selectors = NULL; +	const char **new; + +	/* An empty message clears the selectors */ +	if (msg_len >= sizeof(*msg)) { +		selectors = mgmt_msg_native_strings_decode(msg, msg_len, +							   msg->selectors); +		if (!selectors) { +			fe_adapter_send_error(session, req_id, false, -EINVAL, +					      "Invalid message"); +			return; +		} +	} +	if (msg->replace) { +		darr_free_free(session->notify_xpaths); +		session->notify_xpaths = selectors; +	} else { +		new = darr_append_nz(session->notify_xpaths, +				     darr_len(selectors)); +		memcpy(new, selectors, darr_len(selectors) * sizeof(*selectors)); +		darr_free(selectors); +	} +} + +/**   * fe_adapter_handle_rpc() - Handle an RPC message from an FE client.   * @session: the client session. - * @msg_raw: the message data. + * @__msg: the message data.   * @msg_len: the length of the message data.   */  static void fe_adapter_handle_rpc(struct mgmt_fe_session_ctx *session, @@ -1493,6 +1631,28 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter,  					 size_t msg_len)  {  	struct mgmt_fe_session_ctx *session; +	size_t min_size = mgmt_msg_get_min_size(msg->code); + +	if (msg_len < min_size) { +		if (!min_size) +			__log_err("adapter %s: recv msg refer-id %" PRIu64 +				  " unknown message type %u", +				  adapter->name, msg->refer_id, msg->code); +		else +			__log_err("adapter %s: recv msg refer-id %" PRIu64 +				  " short (%zu<%zu) msg for type %u", +				  adapter->name, msg->refer_id, msg_len, +				  min_size, msg->code); +		return; +	} + +	if (msg->code == MGMT_MSG_CODE_SESSION_REQ) { +		__dbg("adapter %s: session-id %" PRIu64 +		      " received SESSION_REQ message", +		      adapter->name, msg->refer_id); +		fe_adapter_handle_session_req(adapter, msg, msg_len); +		return; +	}  	session = mgmt_session_id2ctx(msg->refer_id);  	if (!session) { @@ -1503,13 +1663,26 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter,  	assert(session->adapter == adapter);  	switch (msg->code) { -	case MGMT_MSG_CODE_GET_DATA: -		fe_adapter_handle_get_data(session, msg, msg_len); -		break;  	case MGMT_MSG_CODE_EDIT: +		__dbg("adapter %s: session-id %" PRIu64 " received EDIT message", +		      adapter->name, msg->refer_id);  		fe_adapter_handle_edit(session, msg, msg_len);  		break; +	case MGMT_MSG_CODE_NOTIFY_SELECT: +		__dbg("adapter %s: session-id %" PRIu64 +		      " received NOTIFY_SELECT message", +		      adapter->name, msg->refer_id); +		fe_adapter_handle_notify_select(session, msg, msg_len); +		break; +	case MGMT_MSG_CODE_GET_DATA: +		__dbg("adapter %s: session-id %" PRIu64 +		      " received GET_DATA message", +		      adapter->name, msg->refer_id); +		fe_adapter_handle_get_data(session, msg, msg_len); +		break;  	case MGMT_MSG_CODE_RPC: +		__dbg("adapter %s: session-id %" PRIu64 " received RPC message", +		      adapter->name, msg->refer_id);  		fe_adapter_handle_rpc(session, msg, msg_len);  		break;  	default: @@ -1554,14 +1727,48 @@ void mgmt_fe_adapter_send_notify(struct mgmt_msg_notify_data *msg, size_t msglen  {  	struct mgmt_fe_client_adapter *adapter;  	struct mgmt_fe_session_ctx *session; +	struct nb_node *nb_node; +	const char **xpath_prefix; +	const char *notif; +	bool sendit; +	uint len;  	assert(msg->refer_id == 0); +	notif = mgmt_msg_native_xpath_decode(msg, msglen); +	if (!notif) { +		__log_err("Corrupt notify msg"); +		return; +	} + +	/* +	 * We need the nb_node to obtain a path which does not include any +	 * specific list entry selectors +	 */ +	nb_node = nb_node_find(notif); +	if (!nb_node) { +		__log_err("No schema found for notification: %s", notif); +		return; +	} +  	FOREACH_ADAPTER_IN_LIST (adapter) {  		FOREACH_SESSION_IN_LIST (adapter, session) { -			msg->refer_id = session->session_id; -			(void)fe_adapter_send_native_msg(adapter, msg, msglen, -							 false); +			/* If no selectors then always send */ +			sendit = !session->notify_xpaths; +			darr_foreach_p (session->notify_xpaths, xpath_prefix) { +				len = strlen(*xpath_prefix); +				if (!strncmp(*xpath_prefix, notif, len) || +				    !strncmp(*xpath_prefix, nb_node->xpath, +					     len)) { +					sendit = true; +					break; +				} +			} +			if (sendit) { +				msg->refer_id = session->session_id; +				(void)fe_adapter_send_native_msg(adapter, msg, +								 msglen, false); +			}  		}  	}  	msg->refer_id = 0;  | 
