]> git.puffer.fish Git - mirror/frr.git/commitdiff
mgmtd: add native RPC processing
authorIgor Ryzhov <iryzhov@nfware.com>
Tue, 19 Mar 2024 19:11:59 +0000 (21:11 +0200)
committerIgor Ryzhov <iryzhov@nfware.com>
Mon, 22 Apr 2024 13:36:22 +0000 (16:36 +0300)
Signed-off-by: Igor Ryzhov <iryzhov@nfware.com>
lib/mgmt_msg_native.h
mgmtd/mgmt_be_adapter.c
mgmtd/mgmt_fe_adapter.c
mgmtd/mgmt_fe_adapter.h
mgmtd/mgmt_memory.c
mgmtd/mgmt_memory.h
mgmtd/mgmt_txn.c
mgmtd/mgmt_txn.h

index 342f6f04cd7a01782cfa2e6a2df2a4cdcf9c3721..cf528a6356383ab5d52e1391ec21ba09231d49da 100644 (file)
@@ -609,7 +609,10 @@ extern int vmgmt_msg_native_send_error(struct msg_conn *conn,
                const char *__s = NULL;                                        \
                if (msg->vsplit && msg->vsplit <= __len &&                     \
                    msg->data[msg->vsplit - 1] == 0) {                         \
-                       (__data) = msg->data + msg->vsplit;                    \
+                       if (msg->vsplit < __len)                               \
+                               (__data) = msg->data + msg->vsplit;            \
+                       else                                                   \
+                               (__data) = NULL;                               \
                        __s = msg->data;                                       \
                }                                                              \
                __s;                                                           \
index 8ce263bb23cefeb06334905992e725b1f281732e..f4fa389b879b8a5ed730c2a30d80c0a7cbe30908 100644 (file)
@@ -672,6 +672,7 @@ static void be_adapter_handle_native_msg(struct mgmt_be_client_adapter *adapter,
 {
        struct mgmt_msg_notify_data *notify_msg;
        struct mgmt_msg_tree_data *tree_msg;
+       struct mgmt_msg_rpc_reply *rpc_msg;
        struct mgmt_msg_error *error_msg;
 
        /* get the transaction */
@@ -696,6 +697,15 @@ static void be_adapter_handle_native_msg(struct mgmt_be_client_adapter *adapter,
                /* Forward the reply to the txn module */
                mgmt_txn_notify_tree_data_reply(adapter, tree_msg, msg_len);
                break;
+       case MGMT_MSG_CODE_RPC_REPLY:
+               /* RPC reply from a backend client */
+               rpc_msg = (typeof(rpc_msg))msg;
+               __dbg("Got RPC_REPLY from '%s' txn-id %" PRIx64, adapter->name,
+                     msg->refer_id);
+
+               /* Forward the reply to the txn module */
+               mgmt_txn_notify_rpc_reply(adapter, rpc_msg, msg_len);
+               break;
        case MGMT_MSG_CODE_NOTIFY:
                notify_msg = (typeof(notify_msg))msg;
                __dbg("Got NOTIFY from '%s'", adapter->name);
index 8ae473fb66fbe1183817227e2f4613833704af1b..b20e36ce0cd413c1689b623c094f9f2078d175d7 100644 (file)
@@ -1101,6 +1101,47 @@ done:
        return ret;
 }
 
+static int fe_adapter_send_rpc_reply(struct mgmt_fe_session_ctx *session,
+                                    uint64_t req_id, uint8_t result_type,
+                                    const struct lyd_node *result)
+{
+       struct mgmt_msg_rpc_reply *msg;
+       uint8_t **darrp = NULL;
+       int ret;
+
+       msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc_reply, 0,
+                                       MTYPE_MSG_NATIVE_RPC_REPLY);
+       msg->refer_id = session->session_id;
+       msg->req_id = req_id;
+       msg->code = MGMT_MSG_CODE_RPC_REPLY;
+       msg->result_type = result_type;
+
+       if (result) {
+               darrp = mgmt_msg_native_get_darrp(msg);
+               ret = yang_print_tree_append(darrp, result, result_type, 0);
+               if (ret != LY_SUCCESS) {
+                       __log_err("Error building rpc-reply result for client %s session-id %" PRIu64
+                                 " req-id %" PRIu64 " result type %u",
+                                 session->adapter->name, session->session_id,
+                                 req_id, result_type);
+                       goto done;
+               }
+       }
+
+       __dbg("Sending rpc-reply from adapter %s to session-id %" PRIu64
+             " req-id %" PRIu64 " len %u",
+             session->adapter->name, session->session_id, req_id,
+             mgmt_msg_native_get_msg_len(msg));
+
+       ret = fe_adapter_send_native_msg(session->adapter, msg,
+                                        mgmt_msg_native_get_msg_len(msg),
+                                        false);
+done:
+       mgmt_msg_native_free_msg(msg);
+
+       return ret;
+}
+
 static int fe_adapter_send_edit_reply(struct mgmt_fe_session_ctx *session,
                                      uint64_t req_id, const char *xpath)
 {
@@ -1271,7 +1312,7 @@ static void fe_adapter_handle_edit(struct mgmt_fe_session_ctx *session,
        }
 
        xpath = mgmt_msg_native_xpath_data_decode(msg, msg_len, data);
-       if (!xpath || !data) {
+       if (!xpath) {
                fe_adapter_send_error(session, msg->req_id, false, -EINVAL,
                                      "Invalid message");
                return;
@@ -1360,6 +1401,96 @@ static void fe_adapter_handle_edit(struct mgmt_fe_session_ctx *session,
        }
 }
 
+/**
+ * fe_adapter_handle_rpc() - Handle an RPC message from an FE client.
+ * @session: the client session.
+ * @msg_raw: the message data.
+ * @msg_len: the length of the message data.
+ */
+static void fe_adapter_handle_rpc(struct mgmt_fe_session_ctx *session,
+                                 void *__msg, size_t msg_len)
+{
+       struct mgmt_msg_rpc *msg = __msg;
+       const struct lysc_node *snode;
+       const char *xpath, *data;
+       uint64_t req_id = msg->req_id;
+       uint64_t clients;
+       int ret;
+
+       __dbg("Received RPC request from client %s for session-id %" PRIu64
+             " req-id %" PRIu64,
+             session->adapter->name, session->session_id, msg->req_id);
+
+       xpath = mgmt_msg_native_xpath_data_decode(msg, msg_len, data);
+       if (!xpath) {
+               fe_adapter_send_error(session, req_id, false, -EINVAL,
+                                     "Invalid message");
+               return;
+       }
+
+       if (session->txn_id != MGMTD_TXN_ID_NONE) {
+               fe_adapter_send_error(session, req_id, false, -EINPROGRESS,
+                                     "Transaction in progress txn-id: %" PRIu64
+                                     " for session-id: %" PRIu64,
+                                     session->txn_id, session->session_id);
+               return;
+       }
+
+       snode = lys_find_path(ly_native_ctx, NULL, xpath, 0);
+       if (!snode) {
+               fe_adapter_send_error(session, req_id, false, -ENOENT,
+                                     "No such path: %s", xpath);
+               return;
+       }
+
+       if (snode->nodetype == LYS_RPC)
+               clients =
+                       mgmt_be_interested_clients(xpath,
+                                                  MGMT_BE_XPATH_SUBSCR_TYPE_RPC);
+       else if (snode->nodetype == LYS_ACTION)
+               clients =
+                       mgmt_be_interested_clients(xpath,
+                                                  MGMT_BE_XPATH_SUBSCR_TYPE_CFG);
+       else {
+               fe_adapter_send_error(session, req_id, false, -EINVAL,
+                                     "Not an RPC or action path: %s", xpath);
+               return;
+       }
+
+       if (!clients) {
+               __dbg("No backends implement xpath: %s for txn-id: %" PRIu64
+                     " session-id: %" PRIu64,
+                     xpath, session->txn_id, session->session_id);
+
+               fe_adapter_send_error(session, req_id, false, -ENOENT,
+                                     "No backends implement xpath: %s", xpath);
+               return;
+       }
+
+       /* Start a RPC Transaction */
+       session->txn_id = mgmt_create_txn(session->session_id,
+                                         MGMTD_TXN_TYPE_RPC);
+       if (session->txn_id == MGMTD_SESSION_ID_NONE) {
+               fe_adapter_send_error(session, req_id, false, -EINPROGRESS,
+                                     "Failed to create an RPC transaction");
+               return;
+       }
+
+       __dbg("Created new rpc txn-id: %" PRIu64 " for session-id: %" PRIu64,
+             session->txn_id, session->session_id);
+
+       /* Create an RPC request under the transaction */
+       ret = mgmt_txn_send_rpc(session->txn_id, req_id, clients,
+                               msg->request_type, xpath, data,
+                               mgmt_msg_native_data_len_decode(msg, msg_len));
+       if (ret) {
+               /* destroy the just created txn */
+               mgmt_destroy_txn(&session->txn_id);
+               fe_adapter_send_error(session, req_id, false, -EINPROGRESS,
+                                     "Failed to create an RPC transaction");
+       }
+}
+
 /**
  * Handle a native encoded message from the FE client.
  */
@@ -1384,6 +1515,9 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter,
        case MGMT_MSG_CODE_EDIT:
                fe_adapter_handle_edit(session, msg, msg_len);
                break;
+       case MGMT_MSG_CODE_RPC:
+               fe_adapter_handle_rpc(session, msg, msg_len);
+               break;
        default:
                __log_err("unknown native message session-id %" PRIu64
                          " req-id %" PRIu64 " code %u to FE adapter %s",
@@ -1623,6 +1757,24 @@ int mgmt_fe_adapter_send_tree_data(uint64_t session_id, uint64_t txn_id,
        return ret;
 }
 
+int mgmt_fe_adapter_send_rpc_reply(uint64_t session_id, uint64_t txn_id,
+                                  uint64_t req_id, LYD_FORMAT result_type,
+                                  const struct lyd_node *result)
+{
+       struct mgmt_fe_session_ctx *session;
+       int ret;
+
+       session = mgmt_session_id2ctx(session_id);
+       if (!session || session->txn_id != txn_id)
+               return -1;
+
+       ret = fe_adapter_send_rpc_reply(session, req_id, result_type, result);
+
+       mgmt_destroy_txn(&session->txn_id);
+
+       return ret;
+}
+
 int mgmt_fe_adapter_send_edit_reply(uint64_t session_id, uint64_t txn_id,
                                    uint64_t req_id, bool unlock, bool commit,
                                    const char *xpath, int16_t error,
index 8d61ffe9106bf724a88a0cec8d222b6bcc0f1dba..61d6cfae13aa399f0e3fcdc03cca7095e84901b5 100644 (file)
@@ -162,6 +162,26 @@ mgmt_fe_adapter_send_tree_data(uint64_t session_id, uint64_t txn_id,
                               uint32_t wd_options, const struct lyd_node *tree,
                               int partial_error, bool short_circuit_ok);
 
+/**
+ * Send RPC reply back to client.
+ *
+ * This also cleans up and frees the transaction.
+ *
+ * Args:
+ *     session_id: the session.
+ *     txn_id: the txn_id this data pertains to
+ *     req_id: the req id for the rpc message
+ *     result_type: the format of the result data.
+ *     result: the results.
+ *
+ * Return:
+ *     the return value from the underlying send function.
+ */
+extern int mgmt_fe_adapter_send_rpc_reply(uint64_t session_id, uint64_t txn_id,
+                                         uint64_t req_id,
+                                         LYD_FORMAT result_type,
+                                         const struct lyd_node *result);
+
 /**
  * Send edit reply back to client. If error is not 0, a native error is sent.
  *
index 0fce61aa97923c35e1b471e1fbd19b521ec2b4b2..72ccca0626707450e20f8cf84c3bdbcc34f99ab5 100644 (file)
@@ -20,6 +20,7 @@
 DEFINE_MGROUP(MGMTD, "mgmt");
 DEFINE_MTYPE(MGMTD, MGMTD, "instance");
 DEFINE_MTYPE(MGMTD, MGMTD_XPATH, "xpath regex");
+DEFINE_MTYPE(MGMTD, MGMTD_ERR, "error");
 DEFINE_MTYPE(MGMTD, MGMTD_BE_ADPATER, "backend adapter");
 DEFINE_MTYPE(MGMTD, MGMTD_FE_ADPATER, "frontend adapter");
 DEFINE_MTYPE(MGMTD, MGMTD_FE_SESSION, "frontend session");
@@ -30,5 +31,6 @@ DEFINE_MTYPE(MGMTD, MGMTD_TXN_COMMCFG_REQ, "txn commit-config requests");
 DEFINE_MTYPE(MGMTD, MGMTD_TXN_GETDATA_REQ, "txn get-data requests");
 DEFINE_MTYPE(MGMTD, MGMTD_TXN_GETDATA_REPLY, "txn get-data replies");
 DEFINE_MTYPE(MGMTD, MGMTD_TXN_GETTREE_REQ, "txn get-tree requests");
+DEFINE_MTYPE(MGMTD, MGMTD_TXN_RPC_REQ, "txn rpc requests");
 DEFINE_MTYPE(MGMTD, MGMTD_TXN_CFG_BATCH, "txn config batches");
 DEFINE_MTYPE(MGMTD, MGMTD_CMT_INFO, "commit info");
index d5b6aa632ea3a31e6c5a980d7dea6d7b6d96d345..e28586ed830d5aae5984ed1c029f2b2a16090a48 100644 (file)
@@ -14,6 +14,7 @@
 DECLARE_MGROUP(MGMTD);
 DECLARE_MTYPE(MGMTD);
 DECLARE_MTYPE(MGMTD_XPATH);
+DECLARE_MTYPE(MGMTD_ERR);
 DECLARE_MTYPE(MGMTD_BE_ADPATER);
 DECLARE_MTYPE(MGMTD_FE_ADPATER);
 DECLARE_MTYPE(MGMTD_FE_SESSION);
@@ -24,6 +25,7 @@ DECLARE_MTYPE(MGMTD_TXN_COMMCFG_REQ);
 DECLARE_MTYPE(MGMTD_TXN_GETDATA_REQ);
 DECLARE_MTYPE(MGMTD_TXN_GETDATA_REPLY);
 DECLARE_MTYPE(MGMTD_TXN_GETTREE_REQ);
+DECLARE_MTYPE(MGMTD_TXN_RPC_REQ);
 DECLARE_MTYPE(MGMTD_TXN_CFG_BATCH);
 DECLARE_MTYPE(MGMTD_BE_ADAPTER_MSG_BUF);
 DECLARE_MTYPE(MGMTD_CMT_INFO);
index 75abca1cff92125e56153278d627da378361160c..25376c16576f0cb12dfe9eaaa99633a53a78bc8b 100644 (file)
@@ -29,6 +29,7 @@ enum mgmt_txn_event {
        MGMTD_TXN_PROC_COMMITCFG,
        MGMTD_TXN_PROC_GETCFG,
        MGMTD_TXN_PROC_GETTREE,
+       MGMTD_TXN_PROC_RPC,
        MGMTD_TXN_COMMITCFG_TIMEOUT,
 };
 
@@ -188,6 +189,15 @@ struct txn_req_get_tree {
        struct lyd_node *client_results; /* result tree from clients */
 };
 
+struct txn_req_rpc {
+       char *xpath;           /* xpath of rpc/action to invoke */
+       uint64_t sent_clients; /* Bitmask of clients sent req to */
+       uint64_t recv_clients; /* Bitmask of clients recv reply from */
+       uint8_t result_type;   /* LYD_FORMAT for results */
+       char *errstr;          /* error string */
+       struct lyd_node *client_results; /* result tree from clients */
+};
+
 struct mgmt_txn_req {
        struct mgmt_txn_ctx *txn;
        enum mgmt_txn_event req_event;
@@ -196,6 +206,7 @@ struct mgmt_txn_req {
                struct mgmt_set_cfg_req *set_cfg;
                struct mgmt_get_data_req *get_data;
                struct txn_req_get_tree *get_tree;
+               struct txn_req_rpc *rpc;
                struct mgmt_commit_cfg_req commit_cfg;
        } req;
 
@@ -221,6 +232,7 @@ struct mgmt_txn_ctx {
        struct event *proc_get_tree;
        struct event *comm_cfg_timeout;
        struct event *get_tree_timeout;
+       struct event *rpc_timeout;
        struct event *clnup;
 
        /* List of backend adapters involved in this transaction */
@@ -252,6 +264,10 @@ struct mgmt_txn_ctx {
         * List of pending get-tree requests.
         */
        struct mgmt_txn_reqs_head get_tree_reqs;
+       /*
+        * List of pending rpc requests.
+        */
+       struct mgmt_txn_reqs_head rpc_reqs;
        /*
         * There will always be one commit-config allowed for a given
         * transaction/session. No need to maintain lists for it.
@@ -416,6 +432,15 @@ static struct mgmt_txn_req *mgmt_txn_req_alloc(struct mgmt_txn_ctx *txn,
                      " session-id: %" PRIu64,
                      txn_req->req_id, txn->txn_id, txn->session_id);
                break;
+       case MGMTD_TXN_PROC_RPC:
+               txn_req->req.rpc = XCALLOC(MTYPE_MGMTD_TXN_RPC_REQ,
+                                          sizeof(struct txn_req_rpc));
+               assert(txn_req->req.rpc);
+               mgmt_txn_reqs_add_tail(&txn->rpc_reqs, txn_req);
+               __dbg("Added a new RPC req-id: %" PRIu64 " txn-id: %" PRIu64
+                     " session-id: %" PRIu64,
+                     txn_req->req_id, txn->txn_id, txn->session_id);
+               break;
        case MGMTD_TXN_COMMITCFG_TIMEOUT:
                break;
        }
@@ -506,6 +531,15 @@ static void mgmt_txn_req_free(struct mgmt_txn_req **txn_req)
                XFREE(MTYPE_MGMTD_XPATH, (*txn_req)->req.get_tree->xpath);
                XFREE(MTYPE_MGMTD_TXN_GETTREE_REQ, (*txn_req)->req.get_tree);
                break;
+       case MGMTD_TXN_PROC_RPC:
+               __dbg("Deleting RPC req-id: %" PRIu64 " txn-id: %" PRIu64,
+                     (*txn_req)->req_id, (*txn_req)->txn->txn_id);
+               req_list = &(*txn_req)->txn->rpc_reqs;
+               lyd_free_all((*txn_req)->req.rpc->client_results);
+               XFREE(MTYPE_MGMTD_ERR, (*txn_req)->req.rpc->errstr);
+               XFREE(MTYPE_MGMTD_XPATH, (*txn_req)->req.rpc->xpath);
+               XFREE(MTYPE_MGMTD_TXN_RPC_REQ, (*txn_req)->req.rpc);
+               break;
        case MGMTD_TXN_COMMITCFG_TIMEOUT:
                break;
        }
@@ -1308,6 +1342,33 @@ static int txn_get_tree_data_done(struct mgmt_txn_ctx *txn,
        return ret;
 }
 
+static int txn_rpc_done(struct mgmt_txn_ctx *txn, struct mgmt_txn_req *txn_req)
+{
+       struct txn_req_rpc *rpc = txn_req->req.rpc;
+       uint64_t req_id = txn_req->req_id;
+
+       /* cancel timer and send reply onward */
+       EVENT_OFF(txn->rpc_timeout);
+
+       if (rpc->errstr)
+               mgmt_fe_adapter_txn_error(txn->txn_id, req_id, false, -1,
+                                         rpc->errstr);
+       else if (mgmt_fe_adapter_send_rpc_reply(txn->session_id, txn->txn_id,
+                                               req_id, rpc->result_type,
+                                               rpc->client_results)) {
+               __log_err("Error sending the results of RPC for txn-id %" PRIu64
+                         " req_id %" PRIu64 " to requested type %u",
+                         txn->txn_id, req_id, rpc->result_type);
+
+               (void)mgmt_fe_adapter_txn_error(txn->txn_id, req_id, false, -1,
+                                               "Error converting results of RPC");
+       }
+
+       /* we're done with the request */
+       mgmt_txn_req_free(&txn_req);
+
+       return 0;
+}
 
 static void txn_get_tree_timeout(struct event *thread)
 {
@@ -1335,6 +1396,31 @@ static void txn_get_tree_timeout(struct event *thread)
        txn_get_tree_data_done(txn, txn_req);
 }
 
+static void txn_rpc_timeout(struct event *thread)
+{
+       struct mgmt_txn_ctx *txn;
+       struct mgmt_txn_req *txn_req;
+
+       txn_req = (struct mgmt_txn_req *)EVENT_ARG(thread);
+       txn = txn_req->txn;
+
+       assert(txn);
+       assert(txn->type == MGMTD_TXN_TYPE_RPC);
+
+       __log_err("Backend timeout txn-id: %" PRIu64 " ending rpc", txn->txn_id);
+
+       /*
+        * Send a get-tree data reply.
+        *
+        * NOTE: The transaction cleanup will be triggered from Front-end
+        * adapter.
+        */
+
+       txn_req->req.rpc->errstr =
+               XSTRDUP(MTYPE_MGMTD_ERR, "Operation on the backend timed-out");
+       txn_rpc_done(txn, txn_req);
+}
+
 /*
  * Send CFG_APPLY_REQs to all the backend client.
  *
@@ -1518,6 +1604,7 @@ static void mgmt_txn_send_getcfg_reply_data(struct mgmt_txn_req *txn_req,
        case MGMTD_TXN_PROC_SETCFG:
        case MGMTD_TXN_PROC_COMMITCFG:
        case MGMTD_TXN_PROC_GETTREE:
+       case MGMTD_TXN_PROC_RPC:
        case MGMTD_TXN_COMMITCFG_TIMEOUT:
                __log_err("Invalid Txn-Req-Event %u", txn_req->req_event);
                break;
@@ -1723,6 +1810,7 @@ static struct mgmt_txn_ctx *mgmt_txn_create_new(uint64_t session_id,
                mgmt_txn_reqs_init(&txn->set_cfg_reqs);
                mgmt_txn_reqs_init(&txn->get_cfg_reqs);
                mgmt_txn_reqs_init(&txn->get_tree_reqs);
+               mgmt_txn_reqs_init(&txn->rpc_reqs);
                txn->commit_cfg_req = NULL;
                txn->refcount = 0;
                if (!mgmt_txn_mm->next_txn_id)
@@ -1892,6 +1980,7 @@ static void mgmt_txn_register_event(struct mgmt_txn_ctx *txn,
                                &txn->comm_cfg_timeout);
                break;
        case MGMTD_TXN_PROC_GETTREE:
+       case MGMTD_TXN_PROC_RPC:
                assert(!"code bug do not register this event");
                break;
        }
@@ -2498,6 +2587,64 @@ reply:
        return 0;
 }
 
+int mgmt_txn_send_rpc(uint64_t txn_id, uint64_t req_id, uint64_t clients,
+                     LYD_FORMAT result_type, const char *xpath,
+                     const char *data, size_t data_len)
+{
+       struct mgmt_txn_ctx *txn;
+       struct mgmt_txn_req *txn_req;
+       struct mgmt_msg_rpc *msg;
+       struct txn_req_rpc *rpc;
+       uint64_t id;
+       int ret;
+
+       txn = mgmt_txn_id2ctx(txn_id);
+       if (!txn)
+               return -1;
+
+       txn_req = mgmt_txn_req_alloc(txn, req_id, MGMTD_TXN_PROC_RPC);
+       rpc = txn_req->req.rpc;
+       rpc->xpath = XSTRDUP(MTYPE_MGMTD_XPATH, xpath);
+       rpc->result_type = result_type;
+
+       msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc, 0,
+                                       MTYPE_MSG_NATIVE_RPC);
+       msg->refer_id = txn_id;
+       msg->req_id = req_id;
+       msg->code = MGMT_MSG_CODE_RPC;
+       msg->request_type = result_type;
+
+       mgmt_msg_native_xpath_encode(msg, xpath);
+       if (data)
+               mgmt_msg_native_append(msg, data, data_len);
+
+       assert(clients);
+       FOREACH_BE_CLIENT_BITS (id, clients) {
+               ret = mgmt_be_send_native(id, msg);
+               if (ret) {
+                       __log_err("Could not send rpc message to backend client %s",
+                                 mgmt_be_client_id2name(id));
+                       continue;
+               }
+
+               __dbg("Sent rpc req to backend client %s",
+                     mgmt_be_client_id2name(id));
+
+               /* record that we sent the request to the client */
+               rpc->sent_clients |= (1u << id);
+       }
+
+       mgmt_msg_native_free_msg(msg);
+
+       if (!rpc->sent_clients)
+               return txn_rpc_done(txn, txn_req);
+
+       event_add_timer(mgmt_txn_tm, txn_rpc_timeout, txn_req,
+                       MGMTD_TXN_RPC_MAX_DELAY_SEC, &txn->rpc_timeout);
+
+       return 0;
+}
+
 /*
  * Error reply from the backend client.
  */
@@ -2508,6 +2655,7 @@ int mgmt_txn_notify_error(struct mgmt_be_client_adapter *adapter,
        enum mgmt_be_client_id id = adapter->id;
        struct mgmt_txn_ctx *txn = mgmt_txn_id2ctx(txn_id);
        struct txn_req_get_tree *get_tree;
+       struct txn_req_rpc *rpc;
        struct mgmt_txn_req *txn_req;
 
        if (!txn) {
@@ -2520,6 +2668,10 @@ int mgmt_txn_notify_error(struct mgmt_be_client_adapter *adapter,
        FOREACH_TXN_REQ_IN_LIST (&txn->get_tree_reqs, txn_req)
                if (txn_req->req_id == req_id)
                        break;
+       if (!txn_req)
+               FOREACH_TXN_REQ_IN_LIST (&txn->rpc_reqs, txn_req)
+                       if (txn_req->req_id == req_id)
+                               break;
        if (!txn_req) {
                __log_err("Error reply from %s for txn-id %" PRIu64
                          " cannot find req_id %" PRIu64,
@@ -2540,6 +2692,15 @@ int mgmt_txn_notify_error(struct mgmt_be_client_adapter *adapter,
                if (get_tree->recv_clients != get_tree->sent_clients)
                        return 0;
                return txn_get_tree_data_done(txn, txn_req);
+       case MGMTD_TXN_PROC_RPC:
+               rpc = txn_req->req.rpc;
+               rpc->recv_clients |= (1u << id);
+               rpc->errstr = XSTRDUP(MTYPE_MGMTD_ERR, errstr);
+
+               /* check if done yet */
+               if (rpc->recv_clients != rpc->sent_clients)
+                       return 0;
+               return txn_rpc_done(txn, txn_req);
 
        /* non-native message events */
        case MGMTD_TXN_PROC_SETCFG:
@@ -2627,6 +2788,64 @@ int mgmt_txn_notify_tree_data_reply(struct mgmt_be_client_adapter *adapter,
        return txn_get_tree_data_done(txn, txn_req);
 }
 
+int mgmt_txn_notify_rpc_reply(struct mgmt_be_client_adapter *adapter,
+                             struct mgmt_msg_rpc_reply *reply_msg,
+                             size_t msg_len)
+{
+       uint64_t txn_id = reply_msg->refer_id;
+       uint64_t req_id = reply_msg->req_id;
+       enum mgmt_be_client_id id = adapter->id;
+       struct mgmt_txn_ctx *txn = mgmt_txn_id2ctx(txn_id);
+       struct mgmt_txn_req *txn_req;
+       struct txn_req_rpc *rpc;
+       size_t data_len = msg_len - sizeof(*reply_msg);
+       LY_ERR err;
+
+       if (!txn) {
+               __log_err("RPC reply from %s for a missing txn-id %" PRIu64,
+                         adapter->name, txn_id);
+               return -1;
+       }
+
+       /* Find the request. */
+       FOREACH_TXN_REQ_IN_LIST (&txn->rpc_reqs, txn_req)
+               if (txn_req->req_id == req_id)
+                       break;
+       if (!txn_req) {
+               __log_err("RPC reply from %s for txn-id %" PRIu64
+                         " missing req_id %" PRIu64,
+                         adapter->name, txn_id, req_id);
+               return -1;
+       }
+
+       rpc = txn_req->req.rpc;
+
+       /* we don't expect more than one daemon to provide output for an RPC */
+       if (!rpc->client_results && data_len > 0) {
+               err = yang_parse_rpc(rpc->xpath, reply_msg->result_type,
+                                    reply_msg->data, true,
+                                    &rpc->client_results);
+               if (err) {
+                       __log_err("RPC reply from %s for txn-id %" PRIu64
+                                 " req_id %" PRIu64
+                                 " error parsing result of type %u",
+                                 adapter->name, txn_id, req_id,
+                                 reply_msg->result_type);
+                       rpc->errstr =
+                               XSTRDUP(MTYPE_MGMTD_ERR,
+                                       "Cannot parse result from the backend");
+               }
+       }
+
+       rpc->recv_clients |= (1u << id);
+
+       /* check if done yet */
+       if (rpc->recv_clients != rpc->sent_clients)
+               return 0;
+
+       return txn_rpc_done(txn, txn_req);
+}
+
 void mgmt_txn_status_write(struct vty *vty)
 {
        struct mgmt_txn_ctx *txn;
index aeb74469f141f33afe293c233ce71e897daccba3..b6ca288675b75e0ed88ec932594804765e74aa38 100644 (file)
@@ -21,6 +21,7 @@
 
 #define MGMTD_TXN_CFG_COMMIT_MAX_DELAY_SEC 600
 #define MGMTD_TXN_GET_TREE_MAX_DELAY_SEC   600
+#define MGMTD_TXN_RPC_MAX_DELAY_SEC       60
 
 #define MGMTD_TXN_CLEANUP_DELAY_USEC 10
 
@@ -48,7 +49,8 @@ struct mgmt_edit_req;
 enum mgmt_txn_type {
        MGMTD_TXN_TYPE_NONE = 0,
        MGMTD_TXN_TYPE_CONFIG,
-       MGMTD_TXN_TYPE_SHOW
+       MGMTD_TXN_TYPE_SHOW,
+       MGMTD_TXN_TYPE_RPC,
 };
 
 static inline const char *mgmt_txn_type2str(enum mgmt_txn_type type)
@@ -60,6 +62,8 @@ static inline const char *mgmt_txn_type2str(enum mgmt_txn_type type)
                return "CONFIG";
        case MGMTD_TXN_TYPE_SHOW:
                return "SHOW";
+       case MGMTD_TXN_TYPE_RPC:
+               return "RPC";
        }
 
        return "Unknown";
@@ -246,6 +250,25 @@ mgmt_txn_send_edit(uint64_t txn_id, uint64_t req_id, Mgmtd__DatastoreId ds_id,
                   LYD_FORMAT request_type, uint8_t flags, uint8_t operation,
                   const char *xpath, const char *data);
 
+/**
+ * Send RPC request.
+ *
+ * Args:
+ *     txn_id: Transaction identifier.
+ *     req_id: FE client request identifier.
+ *     clients: Bitmask of clients to send RPC to.
+ *     result_type: LYD_FORMAT result format.
+ *     xpath: The xpath of the RPC.
+ *     data: The input parameters data tree.
+ *     data_len: The length of the input parameters data.
+ *
+ * Return:
+ *     0 on success.
+ */
+extern int mgmt_txn_send_rpc(uint64_t txn_id, uint64_t req_id, uint64_t clients,
+                            LYD_FORMAT result_type, const char *xpath,
+                            const char *data, size_t data_len);
+
 /*
  * Notifiy backend adapter on connection.
  */
@@ -312,6 +335,18 @@ extern int mgmt_txn_notify_tree_data_reply(struct mgmt_be_client_adapter *adapte
                                           struct mgmt_msg_tree_data *data_msg,
                                           size_t msg_len);
 
+/**
+ * Process a reply from a backend client to our RPC request
+ *
+ * Args:
+ *     adapter: The adapter that received the result.
+ *     reply_msg: The message from the backend.
+ *     msg_len: Total length of the message.
+ */
+extern int mgmt_txn_notify_rpc_reply(struct mgmt_be_client_adapter *adapter,
+                                    struct mgmt_msg_rpc_reply *reply_msg,
+                                    size_t msg_len);
+
 /*
  * Dump transaction status to vty.
  */