diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/mgmt.proto | 32 | ||||
| -rw-r--r-- | lib/mgmt_be_client.c | 1490 | ||||
| -rw-r--r-- | lib/mgmt_be_client.h | 273 | ||||
| -rw-r--r-- | lib/northbound.c | 299 | ||||
| -rw-r--r-- | lib/northbound.h | 86 | ||||
| -rw-r--r-- | lib/northbound_cli.c | 70 | ||||
| -rw-r--r-- | lib/northbound_confd.c | 3 | ||||
| -rw-r--r-- | lib/northbound_grpc.cpp | 3 | ||||
| -rw-r--r-- | lib/northbound_sysrepo.c | 3 | ||||
| -rw-r--r-- | lib/subdir.am | 2 |
10 files changed, 2137 insertions, 124 deletions
diff --git a/lib/mgmt.proto b/lib/mgmt.proto index 4376794105..8a11ff0fa5 100644 --- a/lib/mgmt.proto +++ b/lib/mgmt.proto @@ -106,18 +106,6 @@ message BeCfgDataCreateReply { optional string error_if_any = 4; } -message BeCfgDataValidateReq { - required uint64 txn_id = 1; - repeated uint64 batch_ids = 2; -} - -message BeCfgDataValidateReply { - required uint64 txn_id = 1; - repeated uint64 batch_ids = 2; - required bool success = 3; - optional string error_if_any = 4; -} - message BeCfgDataApplyReq { required uint64 txn_id = 1; } @@ -181,17 +169,15 @@ message BeMessage { BeTxnReply txn_reply = 5; BeCfgDataCreateReq cfg_data_req = 6; BeCfgDataCreateReply cfg_data_reply = 7; - BeCfgDataValidateReq cfg_validate_req = 8; - BeCfgDataValidateReply cfg_validate_reply = 9; - BeCfgDataApplyReq cfg_apply_req = 10; - BeCfgDataApplyReply cfg_apply_reply = 11; - BeOperDataGetReq get_req = 12; - BeOperDataGetReply get_reply = 13; - BeOperDataNotify notify_data = 14; - BeConfigCmdReq cfg_cmd_req = 15; - BeConfigCmdReply cfg_cmd_reply = 16; - BeShowCmdReq show_cmd_req = 17; - BeShowCmdReply show_cmd_reply = 18; + BeCfgDataApplyReq cfg_apply_req = 8; + BeCfgDataApplyReply cfg_apply_reply = 9; + BeOperDataGetReq get_req = 10; + BeOperDataGetReply get_reply = 11; + BeOperDataNotify notify_data = 12; + BeConfigCmdReq cfg_cmd_req = 13; + BeConfigCmdReply cfg_cmd_reply = 14; + BeShowCmdReq show_cmd_req = 15; + BeShowCmdReply show_cmd_reply = 16; } } diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c new file mode 100644 index 0000000000..d86f3d336a --- /dev/null +++ b/lib/mgmt_be_client.c @@ -0,0 +1,1490 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "libfrr.h" +#include "mgmtd/mgmt.h" +#include "mgmt_be_client.h" +#include "mgmt_pb.h" +#include "network.h" +#include "stream.h" +#include "sockopt.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_CLIENT_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_CLIENT_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_CLIENT_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be_client) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_CLIENT_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +DEFINE_MTYPE_STATIC(LIB, MGMTD_BE_BATCH, + "MGMTD backend transaction batch data"); +DEFINE_MTYPE_STATIC(LIB, MGMTD_BE_TXN, "MGMTD backend transaction data"); + +enum mgmt_be_txn_event { + MGMTD_BE_TXN_PROC_SETCFG = 1, + MGMTD_BE_TXN_PROC_GETCFG, + MGMTD_BE_TXN_PROC_GETDATA +}; + +struct mgmt_be_set_cfg_req { + struct nb_cfg_change cfg_changes[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + uint16_t num_cfg_changes; +}; + +struct mgmt_be_get_data_req { + char *xpaths[MGMTD_MAX_NUM_DATA_REQ_IN_BATCH]; + uint16_t num_xpaths; +}; + +struct mgmt_be_txn_req { + enum mgmt_be_txn_event event; + union { + struct mgmt_be_set_cfg_req set_cfg; + struct mgmt_be_get_data_req get_data; + } req; +}; + +PREDECL_LIST(mgmt_be_batches); +struct mgmt_be_batch_ctx { + /* Batch-Id as assigned by MGMTD */ + uint64_t batch_id; + + struct mgmt_be_txn_req txn_req; + + uint32_t flags; + + struct mgmt_be_batches_item list_linkage; +}; +#define MGMTD_BE_BATCH_FLAGS_CFG_PREPARED (1U << 0) +#define MGMTD_BE_TXN_FLAGS_CFG_APPLIED (1U << 1) +DECLARE_LIST(mgmt_be_batches, struct mgmt_be_batch_ctx, list_linkage); + +struct mgmt_be_client_ctx; + +PREDECL_LIST(mgmt_be_txns); +struct mgmt_be_txn_ctx { + /* Txn-Id as assigned by MGMTD */ + uint64_t txn_id; + uint32_t flags; + + struct mgmt_be_client_txn_ctx client_data; + struct mgmt_be_client_ctx *client_ctx; + + /* List of batches belonging to this transaction */ + struct mgmt_be_batches_head cfg_batches; + struct mgmt_be_batches_head apply_cfgs; + + struct mgmt_be_txns_item list_linkage; + + struct nb_transaction *nb_txn; + uint32_t nb_txn_id; +}; +#define MGMTD_BE_TXN_FLAGS_CFGPREP_FAILED (1U << 1) + +DECLARE_LIST(mgmt_be_txns, struct mgmt_be_txn_ctx, list_linkage); + +#define FOREACH_BE_TXN_BATCH_IN_LIST(txn, batch) \ + frr_each_safe (mgmt_be_batches, &(txn)->cfg_batches, (batch)) + +#define FOREACH_BE_APPLY_BATCH_IN_LIST(txn, batch) \ + frr_each_safe (mgmt_be_batches, &(txn)->apply_cfgs, (batch)) + +struct mgmt_be_client_ctx { + int conn_fd; + struct thread_master *tm; + struct thread *conn_retry_tmr; + struct thread *conn_read_ev; + struct thread *conn_write_ev; + struct thread *conn_writes_on; + struct thread *msg_proc_ev; + uint32_t flags; + uint32_t num_msg_tx; + uint32_t num_msg_rx; + + struct stream_fifo *ibuf_fifo; + struct stream *ibuf_work; + struct stream_fifo *obuf_fifo; + struct stream *obuf_work; + uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + + struct nb_config *candidate_config; + struct nb_config *running_config; + + unsigned long num_batch_find; + unsigned long avg_batch_find_tm; + unsigned long num_edit_nb_cfg; + unsigned long avg_edit_nb_cfg_tm; + unsigned long num_prep_nb_cfg; + unsigned long avg_prep_nb_cfg_tm; + unsigned long num_apply_nb_cfg; + unsigned long avg_apply_nb_cfg_tm; + + struct mgmt_be_txns_head txn_head; + struct mgmt_be_client_params client_params; +}; + +#define MGMTD_BE_CLIENT_FLAGS_WRITES_OFF (1U << 0) + +#define FOREACH_BE_TXN_IN_LIST(client_ctx, txn) \ + frr_each_safe (mgmt_be_txns, &(client_ctx)->txn_head, (txn)) + +static bool mgmt_debug_be_client; + +static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0}; + +const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { +#if 0 +#ifdef HAVE_STATICD + [MGMTD_BE_CLIENT_ID_STATICD] = "staticd", +#endif +#endif + [MGMTD_BE_CLIENT_ID_MAX] = "Unknown/Invalid", +}; + +/* Forward declarations */ +static void +mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, + enum mgmt_be_event event); +static void +mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, + unsigned long intvl_secs); +static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg); + +static void +mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx, + bool reconnect) +{ + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params + .client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, false); + + if (client_ctx->conn_fd) { + close(client_ctx->conn_fd); + client_ctx->conn_fd = 0; + } + + if (reconnect) + mgmt_be_client_schedule_conn_retry( + client_ctx, + client_ctx->client_params.conn_retry_intvl_sec); +} + +static struct mgmt_be_batch_ctx * +mgmt_be_find_batch_by_id(struct mgmt_be_txn_ctx *txn, + uint64_t batch_id) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + if (batch->batch_id == batch_id) + return batch; + } + + return NULL; +} + +static struct mgmt_be_batch_ctx * +mgmt_be_batch_create(struct mgmt_be_txn_ctx *txn, uint64_t batch_id) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + batch = mgmt_be_find_batch_by_id(txn, batch_id); + if (!batch) { + batch = XCALLOC(MTYPE_MGMTD_BE_BATCH, + sizeof(struct mgmt_be_batch_ctx)); + assert(batch); + + batch->batch_id = batch_id; + mgmt_be_batches_add_tail(&txn->cfg_batches, batch); + + MGMTD_BE_CLIENT_DBG("Added new batch 0x%llx to transaction", + (unsigned long long)batch_id); + } + + return batch; +} + +static void mgmt_be_batch_delete(struct mgmt_be_txn_ctx *txn, + struct mgmt_be_batch_ctx **batch) +{ + uint16_t indx; + + if (!batch) + return; + + mgmt_be_batches_del(&txn->cfg_batches, *batch); + if ((*batch)->txn_req.event == MGMTD_BE_TXN_PROC_SETCFG) { + for (indx = 0; indx < MGMTD_MAX_CFG_CHANGES_IN_BATCH; indx++) { + if ((*batch)->txn_req.req.set_cfg.cfg_changes[indx] + .value) { + free((char *)(*batch) + ->txn_req.req.set_cfg + .cfg_changes[indx] + .value); + } + } + } + + XFREE(MTYPE_MGMTD_BE_BATCH, *batch); + *batch = NULL; +} + +static void mgmt_be_cleanup_all_batches(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + mgmt_be_batch_delete(txn, &batch); + } + + FOREACH_BE_APPLY_BATCH_IN_LIST (txn, batch) { + mgmt_be_batch_delete(txn, &batch); + } +} + +static struct mgmt_be_txn_ctx * +mgmt_be_find_txn_by_id(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + FOREACH_BE_TXN_IN_LIST (client_ctx, txn) { + if (txn->txn_id == txn_id) + return txn; + } + + return NULL; +} + +static struct mgmt_be_txn_ctx * +mgmt_be_txn_create(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + txn = XCALLOC(MTYPE_MGMTD_BE_TXN, + sizeof(struct mgmt_be_txn_ctx)); + assert(txn); + + txn->txn_id = txn_id; + txn->client_ctx = client_ctx; + mgmt_be_batches_init(&txn->cfg_batches); + mgmt_be_batches_init(&txn->apply_cfgs); + mgmt_be_txns_add_tail(&client_ctx->txn_head, txn); + + MGMTD_BE_CLIENT_DBG("Added new transaction 0x%llx", + (unsigned long long)txn_id); + } + + return txn; +} + +static void mgmt_be_txn_delete(struct mgmt_be_client_ctx *client_ctx, + struct mgmt_be_txn_ctx **txn) +{ + char err_msg[] = "MGMT Transaction Delete"; + + if (!txn) + return; + + /* + * Remove the transaction from the list of transactions + * so that future lookups with the same transaction id + * does not return this one. + */ + mgmt_be_txns_del(&client_ctx->txn_head, *txn); + + /* + * Time to delete the transaction which should also + * take care of cleaning up all batches created via + * CFGDATA_CREATE_REQs. But first notify the client + * about the transaction delete. + */ + if (client_ctx->client_params.txn_notify) + (void)(*client_ctx->client_params + .txn_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + &(*txn)->client_data, true); + + mgmt_be_cleanup_all_batches(*txn); + if ((*txn)->nb_txn) + nb_candidate_commit_abort((*txn)->nb_txn, err_msg, + sizeof(err_msg)); + XFREE(MTYPE_MGMTD_BE_TXN, *txn); + + *txn = NULL; +} + +static void +mgmt_be_cleanup_all_txns(struct mgmt_be_client_ctx *client_ctx) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + FOREACH_BE_TXN_IN_LIST (client_ctx, txn) { + mgmt_be_txn_delete(client_ctx, &txn); + } +} + +static int mgmt_be_send_txn_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, bool create, + bool success) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeTxnReply txn_reply; + + mgmtd__be_txn_reply__init(&txn_reply); + txn_reply.create = create; + txn_reply.txn_id = txn_id; + txn_reply.success = success; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY; + be_msg.txn_reply = &txn_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending TXN_REPLY message to MGMTD for txn 0x%llx", + (unsigned long long)txn_id); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_process_txn_req(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, bool create) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (create) { + if (txn) { + /* + * Transaction with same txn-id already exists. + * Should not happen under any circumstances. + */ + MGMTD_BE_CLIENT_ERR( + "Transaction 0x%llx already exists!!!", + (unsigned long long)txn_id); + mgmt_be_send_txn_reply(client_ctx, txn_id, create, + false); + } + + MGMTD_BE_CLIENT_DBG("Created new transaction 0x%llx", + (unsigned long long)txn_id); + txn = mgmt_be_txn_create(client_ctx, txn_id); + + if (client_ctx->client_params.txn_notify) + (void)(*client_ctx->client_params + .txn_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + &txn->client_data, false); + } else { + if (!txn) { + /* + * Transaction with same txn-id does not exists. + * Return sucess anyways. + */ + MGMTD_BE_CLIENT_DBG( + "Transaction to delete 0x%llx does NOT exists!!!", + (unsigned long long)txn_id); + } else { + MGMTD_BE_CLIENT_DBG("Delete transaction 0x%llx", + (unsigned long long)txn_id); + mgmt_be_txn_delete(client_ctx, &txn); + } + } + + mgmt_be_send_txn_reply(client_ctx, txn_id, create, true); + + return 0; +} + +static int +mgmt_be_send_cfgdata_create_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_id, + bool success, const char *error_if_any) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataCreateReply cfgdata_reply; + + mgmtd__be_cfg_data_create_reply__init(&cfgdata_reply); + cfgdata_reply.txn_id = (uint64_t)txn_id; + cfgdata_reply.batch_id = (uint64_t)batch_id; + cfgdata_reply.success = success; + if (error_if_any) + cfgdata_reply.error_if_any = (char *)error_if_any; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY; + be_msg.cfg_data_reply = &cfgdata_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending CFGDATA_CREATE_REPLY message to MGMTD for txn 0x%llx batch 0x%llx", + (unsigned long long)txn_id, (unsigned long long)batch_id); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static void mgmt_be_txn_cfg_abort(struct mgmt_be_txn_ctx *txn) +{ + char errmsg[BUFSIZ] = {0}; + + assert(txn && txn->client_ctx); + if (txn->nb_txn) { + MGMTD_BE_CLIENT_ERR( + "Aborting configurations after prep for Txn 0x%llx", + (unsigned long long)txn->txn_id); + nb_candidate_commit_abort(txn->nb_txn, errmsg, sizeof(errmsg)); + txn->nb_txn = 0; + } + + /* + * revert candidate back to running + * + * This is one txn ctx but the candidate_config is per client ctx, how + * does that work? + */ + MGMTD_BE_CLIENT_DBG( + "Reset candidate configurations after abort of Txn 0x%llx", + (unsigned long long)txn->txn_id); + nb_config_replace(txn->client_ctx->candidate_config, + txn->client_ctx->running_config, true); +} + +static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_client_ctx *client_ctx; + struct mgmt_be_txn_req *txn_req = NULL; + struct nb_context nb_ctx = {0}; + struct timeval edit_nb_cfg_start; + struct timeval edit_nb_cfg_end; + unsigned long edit_nb_cfg_tm; + struct timeval prep_nb_cfg_start; + struct timeval prep_nb_cfg_end; + unsigned long prep_nb_cfg_tm; + struct mgmt_be_batch_ctx *batch; + bool error; + char err_buf[BUFSIZ]; + size_t num_processed; + bool debug_be = mgmt_debug_be_client; + int err; + + assert(txn && txn->client_ctx); + client_ctx = txn->client_ctx; + + num_processed = 0; + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + txn_req = &batch->txn_req; + error = false; + nb_ctx.client = NB_CLIENT_CLI; + nb_ctx.user = (void *)client_ctx->client_params.user_data; + + if (!txn->nb_txn) { + /* + * This happens when the current backend client is only + * interested in consuming the config items but is not + * interested in validating it. + */ + error = false; + if (debug_be) + gettimeofday(&edit_nb_cfg_start, NULL); + nb_candidate_edit_config_changes( + client_ctx->candidate_config, + txn_req->req.set_cfg.cfg_changes, + (size_t)txn_req->req.set_cfg.num_cfg_changes, + NULL, NULL, 0, err_buf, sizeof(err_buf), + &error); + if (error) { + err_buf[sizeof(err_buf) - 1] = 0; + MGMTD_BE_CLIENT_ERR( + "Failed to update configs for Txn %llx Batch %llx to Candidate! Err: '%s'", + (unsigned long long)txn->txn_id, + (unsigned long long)batch->batch_id, + err_buf); + return -1; + } + if (debug_be) { + gettimeofday(&edit_nb_cfg_end, NULL); + edit_nb_cfg_tm = timeval_elapsed( + edit_nb_cfg_end, edit_nb_cfg_start); + client_ctx->avg_edit_nb_cfg_tm = + ((client_ctx->avg_edit_nb_cfg_tm + * client_ctx->num_edit_nb_cfg) + + edit_nb_cfg_tm) + / (client_ctx->num_edit_nb_cfg + 1); + } + client_ctx->num_edit_nb_cfg++; + } + + num_processed++; + } + + if (!num_processed) + return 0; + + /* + * Now prepare all the batches we have applied in one go. + */ + nb_ctx.client = NB_CLIENT_CLI; + nb_ctx.user = (void *)client_ctx->client_params.user_data; + if (debug_be) + gettimeofday(&prep_nb_cfg_start, NULL); + err = nb_candidate_commit_prepare(nb_ctx, client_ctx->candidate_config, + "MGMTD Backend Txn", &txn->nb_txn, +#ifdef MGMTD_LOCAL_VALIDATIONS_ENABLED + true, true, +#else + false, true, +#endif + err_buf, sizeof(err_buf) - 1); + if (err != NB_OK) { + err_buf[sizeof(err_buf) - 1] = 0; + if (err == NB_ERR_VALIDATION) + MGMTD_BE_CLIENT_ERR( + "Failed to validate configs for Txn %llx %u Batches! Err: '%s'", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed, err_buf); + else + MGMTD_BE_CLIENT_ERR( + "Failed to prepare configs for Txn %llx, %u Batches! Err: '%s'", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed, err_buf); + error = true; + SET_FLAG(txn->flags, MGMTD_BE_TXN_FLAGS_CFGPREP_FAILED); + } else + MGMTD_BE_CLIENT_DBG( + "Prepared configs for Txn %llx, %u Batches! successfully!", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed); + if (debug_be) { + gettimeofday(&prep_nb_cfg_end, NULL); + prep_nb_cfg_tm = + timeval_elapsed(prep_nb_cfg_end, prep_nb_cfg_start); + client_ctx->avg_prep_nb_cfg_tm = + ((client_ctx->avg_prep_nb_cfg_tm + * client_ctx->num_prep_nb_cfg) + + prep_nb_cfg_tm) + / (client_ctx->num_prep_nb_cfg + 1); + } + client_ctx->num_prep_nb_cfg++; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + mgmt_be_send_cfgdata_create_reply( + client_ctx, txn->txn_id, batch->batch_id, + error ? false : true, error ? err_buf : NULL); + if (!error) { + SET_FLAG(batch->flags, + MGMTD_BE_BATCH_FLAGS_CFG_PREPARED); + mgmt_be_batches_del(&txn->cfg_batches, batch); + mgmt_be_batches_add_tail(&txn->apply_cfgs, batch); + } + } + + if (debug_be) + MGMTD_BE_CLIENT_DBG( + "Avg-nb-edit-duration %lu uSec, nb-prep-duration %lu (avg: %lu) uSec, batch size %u", + client_ctx->avg_edit_nb_cfg_tm, prep_nb_cfg_tm, + client_ctx->avg_prep_nb_cfg_tm, (uint32_t)num_processed); + + if (error) + mgmt_be_txn_cfg_abort(txn); + + return 0; +} + +/* + * Process all CFG_DATA_REQs received so far and prepare them all in one go. + */ +static int +mgmt_be_update_setcfg_in_batch(struct mgmt_be_client_ctx *client_ctx, + struct mgmt_be_txn_ctx *txn, + uint64_t batch_id, + Mgmtd__YangCfgDataReq * cfg_req[], + int num_req) +{ + struct mgmt_be_batch_ctx *batch = NULL; + struct mgmt_be_txn_req *txn_req = NULL; + int index; + struct nb_cfg_change *cfg_chg; + + batch = mgmt_be_batch_create(txn, batch_id); + if (!batch) { + MGMTD_BE_CLIENT_ERR("Batch create failed!"); + return -1; + } + + txn_req = &batch->txn_req; + txn_req->event = MGMTD_BE_TXN_PROC_SETCFG; + MGMTD_BE_CLIENT_DBG( + "Created Set-Config request for batch 0x%llx, txn id 0x%llx, cfg-items:%d", + (unsigned long long)batch_id, (unsigned long long)txn->txn_id, + num_req); + + txn_req->req.set_cfg.num_cfg_changes = num_req; + for (index = 0; index < num_req; index++) { + cfg_chg = &txn_req->req.set_cfg.cfg_changes[index]; + + if (cfg_req[index]->req_type + == MGMTD__CFG_DATA_REQ_TYPE__DELETE_DATA) + cfg_chg->operation = NB_OP_DESTROY; + else + cfg_chg->operation = NB_OP_CREATE; + + strlcpy(cfg_chg->xpath, cfg_req[index]->data->xpath, + sizeof(cfg_chg->xpath)); + cfg_chg->value = (cfg_req[index]->data->value + && cfg_req[index] + ->data->value + ->encoded_str_val + ? strdup(cfg_req[index] + ->data->value + ->encoded_str_val) + : NULL); + if (cfg_chg->value + && !strncmp(cfg_chg->value, MGMTD_BE_CONTAINER_NODE_VAL, + strlen(MGMTD_BE_CONTAINER_NODE_VAL))) { + free((char *)cfg_chg->value); + cfg_chg->value = NULL; + } + } + + return 0; +} + +static int +mgmt_be_process_cfgdata_req(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_id, + Mgmtd__YangCfgDataReq * cfg_req[], int num_req, + bool end_of_data) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + MGMTD_BE_CLIENT_ERR( + "Invalid txn-id 0x%llx provided from MGMTD server", + (unsigned long long)txn_id); + mgmt_be_send_cfgdata_create_reply( + client_ctx, txn_id, batch_id, false, + "Transaction context not created yet"); + } else { + mgmt_be_update_setcfg_in_batch(client_ctx, txn, batch_id, + cfg_req, num_req); + } + + if (txn && end_of_data) { + MGMTD_BE_CLIENT_DBG("Triggering CFG_PREPARE_REQ processing"); + mgmt_be_txn_cfg_prepare(txn); + } + + return 0; +} + +static int mgmt_be_send_apply_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_ids[], + size_t num_batch_ids, bool success, + const char *error_if_any) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataApplyReply apply_reply; + + mgmtd__be_cfg_data_apply_reply__init(&apply_reply); + apply_reply.success = success; + apply_reply.txn_id = txn_id; + apply_reply.batch_ids = (uint64_t *)batch_ids; + apply_reply.n_batch_ids = num_batch_ids; + + if (error_if_any) + apply_reply.error_if_any = (char *)error_if_any; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY; + be_msg.cfg_apply_reply = &apply_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending CFG_APPLY_REPLY message to MGMTD for txn 0x%llx, %d batches [0x%llx - 0x%llx]", + (unsigned long long)txn_id, (int)num_batch_ids, + success && num_batch_ids ? + (unsigned long long)batch_ids[0] : 0, + success && num_batch_ids ? + (unsigned long long)batch_ids[num_batch_ids - 1] : 0); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_txn_proc_cfgapply(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_client_ctx *client_ctx; + struct timeval apply_nb_cfg_start; + struct timeval apply_nb_cfg_end; + unsigned long apply_nb_cfg_tm; + struct mgmt_be_batch_ctx *batch; + char err_buf[BUFSIZ]; + size_t num_processed; + static uint64_t batch_ids[MGMTD_BE_MAX_BATCH_IDS_IN_REQ]; + bool debug_be = mgmt_debug_be_client; + + assert(txn && txn->client_ctx); + client_ctx = txn->client_ctx; + + assert(txn->nb_txn); + num_processed = 0; + + /* + * Now apply all the batches we have applied in one go. + */ + if (debug_be) + gettimeofday(&apply_nb_cfg_start, NULL); + (void)nb_candidate_commit_apply(txn->nb_txn, true, &txn->nb_txn_id, + err_buf, sizeof(err_buf) - 1); + if (debug_be) { + gettimeofday(&apply_nb_cfg_end, NULL); + apply_nb_cfg_tm = + timeval_elapsed(apply_nb_cfg_end, apply_nb_cfg_start); + client_ctx->avg_apply_nb_cfg_tm = + ((client_ctx->avg_apply_nb_cfg_tm + * client_ctx->num_apply_nb_cfg) + + apply_nb_cfg_tm) + / (client_ctx->num_apply_nb_cfg + 1); + } + client_ctx->num_apply_nb_cfg++; + txn->nb_txn = NULL; + + /* + * Send back CFG_APPLY_REPLY for all batches applied. + */ + FOREACH_BE_APPLY_BATCH_IN_LIST (txn, batch) { + /* + * No need to delete the batch yet. Will be deleted during + * transaction cleanup on receiving TXN_DELETE_REQ. + */ + SET_FLAG(batch->flags, MGMTD_BE_TXN_FLAGS_CFG_APPLIED); + mgmt_be_batches_del(&txn->apply_cfgs, batch); + mgmt_be_batches_add_tail(&txn->cfg_batches, batch); + + batch_ids[num_processed] = batch->batch_id; + num_processed++; + if (num_processed == MGMTD_BE_MAX_BATCH_IDS_IN_REQ) { + mgmt_be_send_apply_reply(client_ctx, txn->txn_id, + batch_ids, num_processed, + true, NULL); + num_processed = 0; + } + } + + mgmt_be_send_apply_reply(client_ctx, txn->txn_id, batch_ids, + num_processed, true, NULL); + + if (debug_be) + MGMTD_BE_CLIENT_DBG("Nb-apply-duration %lu (avg: %lu) uSec", + apply_nb_cfg_tm, + client_ctx->avg_apply_nb_cfg_tm); + + return 0; +} + +static int +mgmt_be_process_cfg_apply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + mgmt_be_send_apply_reply(client_ctx, txn_id, NULL, 0, false, + "Transaction not created yet!"); + return -1; + } + + MGMTD_BE_CLIENT_DBG("Trigger CFG_APPLY_REQ processing"); + mgmt_be_txn_proc_cfgapply(txn); + + return 0; +} + +static int +mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg) +{ + switch (be_msg->message_case) { + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REPLY: + MGMTD_BE_CLIENT_DBG("Subscribe Reply Msg from mgmt, status %u", + be_msg->subscr_reply->success); + break; + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ: + mgmt_be_process_txn_req(client_ctx, + be_msg->txn_req->txn_id, + be_msg->txn_req->create); + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ: + mgmt_be_process_cfgdata_req( + client_ctx, be_msg->cfg_data_req->txn_id, + be_msg->cfg_data_req->batch_id, + be_msg->cfg_data_req->data_req, + be_msg->cfg_data_req->n_data_req, + be_msg->cfg_data_req->end_of_data); + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ: + mgmt_be_process_cfg_apply( + client_ctx, (uint64_t)be_msg->cfg_apply_req->txn_id); + break; + case MGMTD__BE_MESSAGE__MESSAGE_GET_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REQ: + /* + * TODO: Add handling code in future. + */ + break; + /* + * NOTE: The following messages are always sent from Backend + * clients to MGMTd only and/or need not be handled here. + */ + case MGMTD__BE_MESSAGE__MESSAGE_GET_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_NOTIFY_DATA: + case MGMTD__BE_MESSAGE__MESSAGE__NOT_SET: +#if PROTOBUF_C_VERSION_NUMBER >= 1003000 + case _MGMTD__BE_MESSAGE__MESSAGE_IS_INT_SIZE: +#endif + default: + /* + * A 'default' case is being added contrary to the + * FRR code guidelines to take care of build + * failures on certain build systems (courtesy of + * the proto-c package). + */ + break; + } + + return 0; +} + +static int +mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx, + uint8_t *msg_buf, int bytes_read) +{ + Mgmtd__BeMessage *be_msg; + struct mgmt_be_msg *msg; + uint16_t bytes_left; + uint16_t processed = 0; + + MGMTD_BE_CLIENT_DBG( + "Got message of %d bytes from MGMTD Backend Server", + bytes_read); + + bytes_left = bytes_read; + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; + bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { + msg = (struct mgmt_be_msg *)msg_buf; + if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { + MGMTD_BE_CLIENT_DBG( + "Marker not found in message from MGMTD '%s'", + client_ctx->client_params.name); + break; + } + + if (bytes_left < msg->hdr.len) { + MGMTD_BE_CLIENT_DBG( + "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'", + bytes_left, msg->hdr.len, + client_ctx->client_params.name); + break; + } + + be_msg = mgmtd__be_message__unpack( + NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), + msg->payload); + if (!be_msg) { + MGMTD_BE_CLIENT_DBG( + "Failed to decode %d bytes from MGMTD '%s'", + msg->hdr.len, client_ctx->client_params.name); + continue; + } + + (void)mgmt_be_client_handle_msg(client_ctx, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); + processed++; + client_ctx->num_msg_rx++; + } + + return processed; +} + +static void mgmt_be_client_proc_msgbufs(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + struct stream *work; + int processed = 0; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + if (client_ctx->conn_fd == 0) + return; + + for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { + work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); + if (!work) + break; + + processed += mgmt_be_client_process_msg( + client_ctx, STREAM_DATA(work), stream_get_endp(work)); + + if (work != client_ctx->ibuf_work) { + /* Free it up */ + stream_free(work); + } else { + /* Reset stream buffer for next read */ + stream_reset(work); + } + } + + /* + * If we have more to process, reschedule for processing it. + */ + if (stream_fifo_head(client_ctx->ibuf_fifo)) + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_PROC_MSG); +} + +static void mgmt_be_client_read(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + int bytes_read, msg_cnt; + size_t total_bytes, bytes_left; + struct mgmt_be_msg_hdr *msg_hdr; + bool incomplete = false; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + total_bytes = 0; + bytes_left = STREAM_SIZE(client_ctx->ibuf_work) + - stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + bytes_read = stream_read_try(client_ctx->ibuf_work, + client_ctx->conn_fd, bytes_left); + MGMTD_BE_CLIENT_DBG( + "Got %d bytes of message from MGMTD Backend server", + bytes_read); + /* -2 is normal nothing read, and to retry */ + if (bytes_read == -2) + break; + if (bytes_read <= 0) { + if (bytes_read == 0) { + MGMTD_BE_CLIENT_ERR( + "Got EOF/disconnect while reading from MGMTD Frontend server."); + } else { + MGMTD_BE_CLIENT_ERR( + "Got error (%d) while reading from MGMTD Backend server. Err: '%s'", + bytes_read, safe_strerror(errno)); + } + mgmt_be_server_disconnect(client_ctx, true); + return; + } + total_bytes += bytes_read; + bytes_left -= bytes_read; + } + /* + * Check if we have read complete messages or not. + */ + stream_set_getp(client_ctx->ibuf_work, 0); + total_bytes = 0; + msg_cnt = 0; + bytes_left = stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + msg_hdr = (struct mgmt_be_msg_hdr + *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { + /* Corrupted buffer. Force disconnect?? */ + MGMTD_BE_CLIENT_ERR( + "Received corrupted buffer from MGMTD backend server."); + mgmt_be_server_disconnect(client_ctx, true); + return; + } + if (msg_hdr->len > bytes_left) + break; + + total_bytes += msg_hdr->len; + bytes_left -= msg_hdr->len; + msg_cnt++; + } + + if (!msg_cnt) + goto resched; + + if (bytes_left > 0) + incomplete = true; + + /* + * We have read one or several messages. + * Schedule processing them now. + */ + msg_hdr = + (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + stream_set_endp(client_ctx->ibuf_work, total_bytes); + stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); + client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (incomplete) { + stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); + stream_set_endp(client_ctx->ibuf_work, bytes_left); + } + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); + +resched: + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); +} + +static inline void +mgmt_be_client_sched_msg_write(struct mgmt_be_client_ctx *client_ctx) +{ + if (!CHECK_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF)) + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_CONN_WRITE); +} + +static inline void +mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx) +{ + MGMTD_BE_CLIENT_DBG("Resume writing msgs"); + UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); + if (client_ctx->obuf_work + || stream_fifo_count_safe(client_ctx->obuf_fifo)) + mgmt_be_client_sched_msg_write(client_ctx); +} + +static inline void +mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx) +{ + SET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); + MGMTD_BE_CLIENT_DBG("Paused writing msgs"); +} + +static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg) +{ + size_t msg_size; + uint8_t *msg_buf = client_ctx->msg_buf; + struct mgmt_be_msg *msg; + + if (client_ctx->conn_fd == 0) + return -1; + + msg_size = mgmtd__be_message__get_packed_size(be_msg); + msg_size += MGMTD_BE_MSG_HDR_LEN; + if (msg_size > MGMTD_BE_MSG_MAX_LEN) { + MGMTD_BE_CLIENT_ERR( + "Message size %d more than max size'%d. Not sending!'", + (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + return -1; + } + + msg = (struct mgmt_be_msg *)msg_buf; + msg->hdr.marker = MGMTD_BE_MSG_MARKER; + msg->hdr.len = (uint16_t)msg_size; + mgmtd__be_message__pack(be_msg, msg->payload); + + if (!client_ctx->obuf_work) + client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + } + stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); + + mgmt_be_client_sched_msg_write(client_ctx); + client_ctx->num_msg_tx++; + return 0; +} + +static void mgmt_be_client_write(struct thread *thread) +{ + int bytes_written = 0; + int processed = 0; + int msg_size = 0; + struct stream *s = NULL; + struct stream *free = NULL; + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + /* Ensure pushing any pending write buffer to FIFO */ + if (client_ctx->obuf_work) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = NULL; + } + + for (s = stream_fifo_head(client_ctx->obuf_fifo); + s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; + s = stream_fifo_head(client_ctx->obuf_fifo)) { + /* msg_size = (int)stream_get_size(s); */ + msg_size = (int)STREAM_READABLE(s); + bytes_written = stream_flush(s, client_ctx->conn_fd); + if (bytes_written == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_client_register_event( + client_ctx, MGMTD_BE_CONN_WRITE); + return; + } else if (bytes_written != msg_size) { + MGMTD_BE_CLIENT_ERR( + "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", + msg_size, bytes_written, safe_strerror(errno)); + if (bytes_written > 0) { + stream_forward_getp(s, (size_t)bytes_written); + stream_pulldown(s); + mgmt_be_client_register_event( + client_ctx, MGMTD_BE_CONN_WRITE); + return; + } + mgmt_be_server_disconnect(client_ctx, true); + return; + } + + free = stream_fifo_pop(client_ctx->obuf_fifo); + stream_free(free); + MGMTD_BE_CLIENT_DBG( + "Wrote %d bytes of message to MGMTD Backend client socket.'", + bytes_written); + processed++; + } + + if (s) { + mgmt_be_client_writes_off(client_ctx); + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_CONN_WRITES_ON); + } +} + +static void mgmt_be_client_resume_writes(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + mgmt_be_client_writes_on(client_ctx); +} + +static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, + bool subscr_xpaths, + uint16_t num_reg_xpaths, + char **reg_xpaths) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeSubscribeReq subscr_req; + + mgmtd__be_subscribe_req__init(&subscr_req); + subscr_req.client_name = client_ctx->client_params.name; + subscr_req.n_xpath_reg = num_reg_xpaths; + if (num_reg_xpaths) + subscr_req.xpath_reg = reg_xpaths; + else + subscr_req.xpath_reg = NULL; + subscr_req.subscribe_xpaths = subscr_xpaths; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ; + be_msg.subscr_req = &subscr_req; + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) +{ + int ret, sock, len; + struct sockaddr_un addr; + + MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s", + MGMTD_BE_SERVER_PATH); + + assert(!client_ctx->conn_fd); + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + MGMTD_BE_CLIENT_ERR("Failed to create socket"); + goto mgmt_be_server_connect_failed; + } + + MGMTD_BE_CLIENT_DBG( + "Created MGMTD Backend server socket successfully!"); + + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + len = addr.sun_len = SUN_LEN(&addr); +#else + len = sizeof(addr.sun_family) + strlen(addr.sun_path); +#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ + + ret = connect(sock, (struct sockaddr *)&addr, len); + if (ret < 0) { + MGMTD_BE_CLIENT_ERR( + "Failed to connect to MGMTD Backend Server at %s. Err: %s", + addr.sun_path, safe_strerror(errno)); + close(sock); + goto mgmt_be_server_connect_failed; + } + + MGMTD_BE_CLIENT_DBG( + "Connected to MGMTD Backend Server at %s successfully!", + addr.sun_path); + client_ctx->conn_fd = sock; + + /* Make client socket non-blocking. */ + set_nonblocking(sock); + setsockopt_so_sendbuf(client_ctx->conn_fd, + MGMTD_SOCKET_BE_SEND_BUF_SIZE); + setsockopt_so_recvbuf(client_ctx->conn_fd, + MGMTD_SOCKET_BE_RECV_BUF_SIZE); + + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); + + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params + .client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, true); + + /* Send SUBSCRIBE_REQ message */ + if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) + goto mgmt_be_server_connect_failed; + + return 0; + +mgmt_be_server_connect_failed: + if (sock && sock != client_ctx->conn_fd) + close(sock); + + mgmt_be_server_disconnect(client_ctx, true); + return -1; +} + +static void mgmt_be_client_conn_timeout(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + mgmt_be_server_connect(client_ctx); +} + +static void +mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, + enum mgmt_be_event event) +{ + struct timeval tv = {0}; + + switch (event) { + case MGMTD_BE_CONN_READ: + thread_add_read(client_ctx->tm, mgmt_be_client_read, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_read_ev); + assert(client_ctx->conn_read_ev); + break; + case MGMTD_BE_CONN_WRITE: + thread_add_write(client_ctx->tm, mgmt_be_client_write, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_write_ev); + assert(client_ctx->conn_write_ev); + break; + case MGMTD_BE_PROC_MSG: + tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; + thread_add_timer_tv(client_ctx->tm, + mgmt_be_client_proc_msgbufs, client_ctx, + &tv, &client_ctx->msg_proc_ev); + assert(client_ctx->msg_proc_ev); + break; + case MGMTD_BE_CONN_WRITES_ON: + thread_add_timer_msec( + client_ctx->tm, mgmt_be_client_resume_writes, + client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &client_ctx->conn_writes_on); + assert(client_ctx->conn_writes_on); + break; + case MGMTD_BE_SERVER: + case MGMTD_BE_CONN_INIT: + case MGMTD_BE_SCHED_CFG_PREPARE: + case MGMTD_BE_RESCHED_CFG_PREPARE: + case MGMTD_BE_SCHED_CFG_APPLY: + case MGMTD_BE_RESCHED_CFG_APPLY: + assert(!"mgmt_be_client_post_event() called incorrectly"); + break; + } +} + +static void +mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, + unsigned long intvl_secs) +{ + MGMTD_BE_CLIENT_DBG( + "Scheduling MGMTD Backend server connection retry after %lu seconds", + intvl_secs); + thread_add_timer(client_ctx->tm, mgmt_be_client_conn_timeout, + (void *)client_ctx, intvl_secs, + &client_ctx->conn_retry_tmr); +} + +extern struct nb_config *running_config; + +/* + * Initialize library and try connecting with MGMTD. + */ +uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, + struct thread_master *master_thread) +{ + assert(master_thread && params && strlen(params->name) + && !mgmt_be_client_ctx.tm); + + mgmt_be_client_ctx.tm = master_thread; + + if (!running_config) + assert(!"MGMTD Be Client lib_init() after frr_init() only!"); + mgmt_be_client_ctx.running_config = running_config; + mgmt_be_client_ctx.candidate_config = nb_config_new(NULL); + + memcpy(&mgmt_be_client_ctx.client_params, params, + sizeof(mgmt_be_client_ctx.client_params)); + if (!mgmt_be_client_ctx.client_params.conn_retry_intvl_sec) + mgmt_be_client_ctx.client_params.conn_retry_intvl_sec = + MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC; + + assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work && + !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work); + + mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head); + mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new(); + mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + mgmt_be_client_ctx.obuf_fifo = stream_fifo_new(); + /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + */ + mgmt_be_client_ctx.obuf_work = NULL; + + /* Start trying to connect to MGMTD backend server immediately */ + mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1); + + MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name); + + return (uintptr_t)&mgmt_be_client_ctx; +} + +/* + * Subscribe with MGMTD for one or more YANG subtree(s). + */ +enum mgmt_result mgmt_be_subscribe_yang_data(uintptr_t lib_hndl, + char *reg_yang_xpaths[], + int num_reg_xpaths) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_be_send_subscr_req(client_ctx, true, num_reg_xpaths, + reg_yang_xpaths) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Unsubscribe with MGMTD for one or more YANG subtree(s). + */ +enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, + char *reg_yang_xpaths[], + int num_reg_xpaths) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + + if (mgmt_be_send_subscr_req(client_ctx, false, num_reg_xpaths, + reg_yang_xpaths) + < 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send one or more YANG notifications to MGMTD daemon. + */ +enum mgmt_result mgmt_be_send_yang_notify(uintptr_t lib_hndl, + Mgmtd__YangData * data_elems[], + int num_elems) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + return MGMTD_SUCCESS; +} + +/* + * Destroy library and cleanup everything. + */ +void mgmt_be_client_lib_destroy(uintptr_t lib_hndl) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + assert(client_ctx); + + MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'", + client_ctx->client_params.name); + + mgmt_be_server_disconnect(client_ctx, false); + + assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo); + + stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo); + if (mgmt_be_client_ctx.ibuf_work) + stream_free(mgmt_be_client_ctx.ibuf_work); + stream_fifo_free(mgmt_be_client_ctx.obuf_fifo); + if (mgmt_be_client_ctx.obuf_work) + stream_free(mgmt_be_client_ctx.obuf_work); + + THREAD_OFF(client_ctx->conn_retry_tmr); + THREAD_OFF(client_ctx->conn_read_ev); + THREAD_OFF(client_ctx->conn_write_ev); + THREAD_OFF(client_ctx->conn_writes_on); + THREAD_OFF(client_ctx->msg_proc_ev); + mgmt_be_cleanup_all_txns(client_ctx); + mgmt_be_txns_fini(&client_ctx->txn_head); +} diff --git a/lib/mgmt_be_client.h b/lib/mgmt_be_client.h new file mode 100644 index 0000000000..75b4ddf017 --- /dev/null +++ b/lib/mgmt_be_client.h @@ -0,0 +1,273 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_BE_CLIENT_H_ +#define _FRR_MGMTD_BE_CLIENT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "mgmtd/mgmt_defines.h" + +/*************************************************************** + * Client IDs + ***************************************************************/ + +/* + * Add enum value for each supported component, wrap with + * #ifdef HAVE_COMPONENT + */ +enum mgmt_be_client_id { + MGMTD_BE_CLIENT_ID_MIN = 0, + MGMTD_BE_CLIENT_ID_INIT = -1, +#if 0 /* example */ +#ifdef HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX +}; + +#define FOREACH_MGMTD_BE_CLIENT_ID(id) \ + for ((id) = MGMTD_BE_CLIENT_ID_MIN; \ + (id) < MGMTD_BE_CLIENT_ID_MAX; (id)++) + +/*************************************************************** + * Constants + ***************************************************************/ + +#define MGMTD_BE_CLIENT_ERROR_STRING_MAX_LEN 32 + +#define MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC 5 + +#define MGMTD_BE_MSG_PROC_DELAY_USEC 10 +#define MGMTD_BE_MAX_NUM_MSG_PROC 500 + +#define MGMTD_BE_MSG_WRITE_DELAY_MSEC 1 +#define MGMTD_BE_MAX_NUM_MSG_WRITE 1000 + +#define GMGD_BE_MAX_NUM_REQ_ITEMS 64 + +#define MGMTD_BE_MSG_MAX_LEN 16384 + +#define MGMTD_SOCKET_BE_SEND_BUF_SIZE 65535 +#define MGMTD_SOCKET_BE_RECV_BUF_SIZE MGMTD_SOCKET_BE_SEND_BUF_SIZE + +#define MGMTD_MAX_CFG_CHANGES_IN_BATCH \ + ((10 * MGMTD_BE_MSG_MAX_LEN) / \ + (MGMTD_MAX_XPATH_LEN + MGMTD_MAX_YANG_VALUE_LEN)) + +/* + * MGMTD_BE_MSG_MAX_LEN must be used 80% + * since there is overhead of google protobuf + * that gets added to sent message + */ +#define MGMTD_BE_CFGDATA_PACKING_EFFICIENCY 0.8 +#define MGMTD_BE_CFGDATA_MAX_MSG_LEN \ + (MGMTD_BE_MSG_MAX_LEN * MGMTD_BE_CFGDATA_PACKING_EFFICIENCY) + +#define MGMTD_BE_MAX_BATCH_IDS_IN_REQ \ + (MGMTD_BE_MSG_MAX_LEN - 128) / sizeof(uint64_t) + +#define MGMTD_BE_CONTAINER_NODE_VAL "<<container>>" + +/*************************************************************** + * Data-structures + ***************************************************************/ + +#define MGMTD_BE_MAX_CLIENTS_PER_XPATH_REG 32 + +struct mgmt_be_msg_hdr { + uint16_t marker; + uint16_t len; /* Includes header */ +}; +#define MGMTD_BE_MSG_HDR_LEN sizeof(struct mgmt_be_msg_hdr) +#define MGMTD_BE_MSG_MARKER 0xfeed + +struct mgmt_be_msg { + struct mgmt_be_msg_hdr hdr; + uint8_t payload[]; +}; + +struct mgmt_be_client_txn_ctx { + uintptr_t *user_ctx; +}; + +/* + * All the client-specific information this library needs to + * initialize itself, setup connection with MGMTD BackEnd interface + * and carry on all required procedures appropriately. + * + * BackEnd clients need to initialise a instance of this structure + * with appropriate data and pass it while calling the API + * to initialize the library (See mgmt_be_client_lib_init for + * more details). + */ +struct mgmt_be_client_params { + char name[MGMTD_CLIENT_NAME_MAX_LEN]; + uintptr_t user_data; + unsigned long conn_retry_intvl_sec; + + void (*client_connect_notify)(uintptr_t lib_hndl, + uintptr_t usr_data, + bool connected); + + void (*client_subscribe_notify)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct nb_yang_xpath **xpath, + enum mgmt_result subscribe_result[], int num_paths); + + void (*txn_notify)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, bool destroyed); + + enum mgmt_result (*data_validate)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_value *data, + bool delete, char *error_if_any); + + enum mgmt_result (*data_apply)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_value *data, + bool delete); + + enum mgmt_result (*get_data_elem)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_xpath_elem *elem); + + enum mgmt_result (*get_data)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, bool keys_only, + struct nb_yang_xpath_elem **elems, int *num_elems, + int *next_key); + + enum mgmt_result (*get_next_data)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, bool keys_only, + struct nb_yang_xpath_elem **elems, int *num_elems); +}; + +/*************************************************************** + * Global data exported + ***************************************************************/ + +extern const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1]; + +static inline const char *mgmt_be_client_id2name(enum mgmt_be_client_id id) +{ + if (id > MGMTD_BE_CLIENT_ID_MAX) + id = MGMTD_BE_CLIENT_ID_MAX; + return mgmt_be_client_names[id]; +} + +static inline enum mgmt_be_client_id +mgmt_be_client_name2id(const char *name) +{ + enum mgmt_be_client_id id; + + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (!strncmp(mgmt_be_client_names[id], name, + MGMTD_CLIENT_NAME_MAX_LEN)) + return id; + } + + return MGMTD_BE_CLIENT_ID_MAX; +} + +/*************************************************************** + * API prototypes + ***************************************************************/ + +/* + * Initialize library and try connecting with MGMTD. + * + * params + * Backend client parameters. + * + * master_thread + * Thread master. + * + * Returns: + * Backend client lib handler (nothing but address of mgmt_be_client_ctx) + */ +extern uintptr_t +mgmt_be_client_lib_init(struct mgmt_be_client_params *params, + struct thread_master *master_thread); + +/* + * Subscribe with MGMTD for one or more YANG subtree(s). + * + * lib_hndl + * Client library handler. + * + * reg_yang_xpaths + * Yang xpath(s) that needs to be subscribed to. + * + * num_xpaths + * Number of xpaths + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result mgmt_be_subscribe_yang_data(uintptr_t lib_hndl, + char **reg_yang_xpaths, + int num_xpaths); + +/* + * Send one or more YANG notifications to MGMTD daemon. + * + * lib_hndl + * Client library handler. + * + * data_elems + * Yang data elements from data tree. + * + * num_elems + * Number of data elements. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_be_send_yang_notify(uintptr_t lib_hndl, Mgmtd__YangData **data_elems, + int num_elems); + +/* + * Un-subscribe with MGMTD for one or more YANG subtree(s). + * + * lib_hndl + * Client library handler. + * + * reg_yang_xpaths + * Yang xpath(s) that needs to be un-subscribed from. + * + * num_reg_xpaths + * Number of subscribed xpaths + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, + char **reg_yang_xpaths, + int num_reg_xpaths); + +/* + * Destroy library and cleanup everything. + */ +extern void mgmt_be_client_lib_destroy(uintptr_t lib_hndl); + +#ifdef __cplusplus +} +#endif + +#endif /* _FRR_MGMTD_BE_CLIENT_H_ */ diff --git a/lib/northbound.c b/lib/northbound.c index 0d4c3f0f0d..b9b840a60d 100644 --- a/lib/northbound.c +++ b/lib/northbound.c @@ -93,7 +93,9 @@ static int nb_node_new_cb(const struct lysc_node *snode, void *arg) { struct nb_node *nb_node; struct lysc_node *sparent, *sparent_list; + struct frr_yang_module_info *module; + module = (struct frr_yang_module_info *)arg; nb_node = XCALLOC(MTYPE_NB_NODE, sizeof(*nb_node)); yang_snode_get_path(snode, YANG_PATH_DATA, nb_node->xpath, sizeof(nb_node->xpath)); @@ -128,6 +130,9 @@ static int nb_node_new_cb(const struct lysc_node *snode, void *arg) assert(snode->priv == NULL); ((struct lysc_node *)snode)->priv = nb_node; + if (module && module->ignore_cbs) + SET_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS); + return YANG_ITER_CONTINUE; } @@ -230,6 +235,9 @@ static unsigned int nb_node_validate_cbs(const struct nb_node *nb_node) { unsigned int error = 0; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return error; + error += nb_node_validate_cb(nb_node, NB_OP_CREATE, !!nb_node->cbs.create, false); error += nb_node_validate_cb(nb_node, NB_OP_MODIFY, @@ -297,6 +305,8 @@ struct nb_config *nb_config_new(struct lyd_node *dnode) config->dnode = yang_dnode_new(ly_native_ctx, true); config->version = 0; + RB_INIT(nb_config_cbs, &config->cfg_chgs); + return config; } @@ -304,6 +314,7 @@ void nb_config_free(struct nb_config *config) { if (config->dnode) yang_dnode_free(config->dnode); + nb_config_diff_del_changes(&config->cfg_chgs); XFREE(MTYPE_NB_CONFIG, config); } @@ -315,6 +326,8 @@ struct nb_config *nb_config_dup(const struct nb_config *config) dup->dnode = yang_dnode_dup(config->dnode); dup->version = config->version; + RB_INIT(nb_config_cbs, &dup->cfg_chgs); + return dup; } @@ -405,7 +418,7 @@ static void nb_config_diff_add_change(struct nb_config_cbs *changes, RB_INSERT(nb_config_cbs, changes, &change->cb); } -static void nb_config_diff_del_changes(struct nb_config_cbs *changes) +void nb_config_diff_del_changes(struct nb_config_cbs *changes) { while (!RB_EMPTY(nb_config_cbs, changes)) { struct nb_config_change *change; @@ -422,8 +435,8 @@ static void nb_config_diff_del_changes(struct nb_config_cbs *changes) * configurations. Given a new subtree, calculate all new YANG data nodes, * excluding default leafs and leaf-lists. This is a recursive function. */ -static void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, - struct nb_config_cbs *changes) +void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, + struct nb_config_cbs *changes) { enum nb_operation operation; struct lyd_node *child; @@ -525,10 +538,16 @@ static inline void nb_config_diff_dnode_log(const char *context, } #endif -/* Calculate the delta between two different configurations. */ -static void nb_config_diff(const struct nb_config *config1, - const struct nb_config *config2, - struct nb_config_cbs *changes) +/* + * Calculate the delta between two different configurations. + * + * NOTE: 'config1' is the reference DB, while 'config2' is + * the DB being compared against 'config1'. Typically 'config1' + * should be the Running DB and 'config2' is the Candidate DB. + */ +void nb_config_diff(const struct nb_config *config1, + const struct nb_config *config2, + struct nb_config_cbs *changes) { struct lyd_node *diff = NULL; const struct lyd_node *root, *dnode; @@ -734,6 +753,169 @@ int nb_candidate_edit(struct nb_config *candidate, return NB_OK; } +static void nb_update_candidate_changes(struct nb_config *candidate, + struct nb_cfg_change *change, + uint32_t *seq) +{ + enum nb_operation oper = change->operation; + char *xpath = change->xpath; + struct lyd_node *root = NULL; + struct lyd_node *dnode; + struct nb_config_cbs *cfg_chgs = &candidate->cfg_chgs; + int op; + + switch (oper) { + case NB_OP_CREATE: + case NB_OP_MODIFY: + root = yang_dnode_get(candidate->dnode, xpath); + break; + case NB_OP_DESTROY: + root = yang_dnode_get(running_config->dnode, xpath); + /* code */ + break; + case NB_OP_MOVE: + case NB_OP_PRE_VALIDATE: + case NB_OP_APPLY_FINISH: + case NB_OP_GET_ELEM: + case NB_OP_GET_NEXT: + case NB_OP_GET_KEYS: + case NB_OP_LOOKUP_ENTRY: + case NB_OP_RPC: + break; + default: + assert(!"non-enum value, invalid"); + } + + if (!root) + return; + + LYD_TREE_DFS_BEGIN (root, dnode) { + op = nb_lyd_diff_get_op(dnode); + switch (op) { + case 'c': + nb_config_diff_created(dnode, seq, cfg_chgs); + LYD_TREE_DFS_continue = 1; + break; + case 'd': + nb_config_diff_deleted(dnode, seq, cfg_chgs); + LYD_TREE_DFS_continue = 1; + break; + case 'r': + nb_config_diff_add_change(cfg_chgs, NB_OP_MODIFY, seq, + dnode); + break; + default: + break; + } + LYD_TREE_DFS_END(root, dnode); + } +} + +static bool nb_is_operation_allowed(struct nb_node *nb_node, + struct nb_cfg_change *change) +{ + enum nb_operation oper = change->operation; + + if (lysc_is_key(nb_node->snode)) { + if (oper == NB_OP_MODIFY || oper == NB_OP_DESTROY) + return false; + } + return true; +} + +void nb_candidate_edit_config_changes( + struct nb_config *candidate_config, struct nb_cfg_change cfg_changes[], + size_t num_cfg_changes, const char *xpath_base, const char *curr_xpath, + int xpath_index, char *err_buf, int err_bufsize, bool *error) +{ + uint32_t seq = 0; + + if (error) + *error = false; + + if (xpath_base == NULL) + xpath_base = ""; + + /* Edit candidate configuration. */ + for (size_t i = 0; i < num_cfg_changes; i++) { + struct nb_cfg_change *change = &cfg_changes[i]; + struct nb_node *nb_node; + char xpath[XPATH_MAXLEN]; + struct yang_data *data; + int ret; + + /* Handle relative XPaths. */ + memset(xpath, 0, sizeof(xpath)); + if (xpath_index > 0 + && (xpath_base[0] == '.' || change->xpath[0] == '.')) + strlcpy(xpath, curr_xpath, sizeof(xpath)); + if (xpath_base[0]) { + if (xpath_base[0] == '.') + strlcat(xpath, xpath_base + 1, sizeof(xpath)); + else + strlcat(xpath, xpath_base, sizeof(xpath)); + } + if (change->xpath[0] == '.') + strlcat(xpath, change->xpath + 1, sizeof(xpath)); + else + strlcpy(xpath, change->xpath, sizeof(xpath)); + + /* Find the northbound node associated to the data path. */ + nb_node = nb_node_find(xpath); + if (!nb_node) { + flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH, + "%s: unknown data path: %s", __func__, xpath); + if (error) + *error = true; + continue; + } + /* Find if the node to be edited is not a key node */ + if (!nb_is_operation_allowed(nb_node, change)) { + zlog_err(" Xpath %s points to key node", xpath); + if (error) + *error = true; + break; + } + + /* If the value is not set, get the default if it exists. */ + if (change->value == NULL) + change->value = yang_snode_get_default(nb_node->snode); + data = yang_data_new(xpath, change->value); + + /* + * Ignore "not found" errors when editing the candidate + * configuration. + */ + ret = nb_candidate_edit(candidate_config, nb_node, + change->operation, xpath, NULL, data); + yang_data_free(data); + if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { + flog_warn( + EC_LIB_NB_CANDIDATE_EDIT_ERROR, + "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", + __func__, nb_operation_name(change->operation), + xpath); + if (error) + *error = true; + continue; + } + nb_update_candidate_changes(candidate_config, change, &seq); + } + + if (error && *error) { + char buf[BUFSIZ]; + + /* + * Failure to edit the candidate configuration should never + * happen in practice, unless there's a bug in the code. When + * that happens, log the error but otherwise ignore it. + */ + snprintf(err_buf, err_bufsize, + "%% Failed to edit configuration.\n\n%s", + yang_print_errors(ly_native_ctx, buf, sizeof(buf))); + } +} + bool nb_candidate_needs_update(const struct nb_config *candidate) { if (candidate->version < running_config->version) @@ -761,8 +943,8 @@ int nb_candidate_update(struct nb_config *candidate) * WARNING: lyd_validate() can change the configuration as part of the * validation process. */ -static int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, - size_t errmsg_len) +int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, + size_t errmsg_len) { if (lyd_validate_all(&candidate->dnode, ly_native_ctx, LYD_VALIDATE_NO_STATE, NULL) @@ -775,10 +957,10 @@ static int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, } /* Perform code-level validation using the northbound callbacks. */ -static int nb_candidate_validate_code(struct nb_context *context, - struct nb_config *candidate, - struct nb_config_cbs *changes, - char *errmsg, size_t errmsg_len) +int nb_candidate_validate_code(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, char *errmsg, + size_t errmsg_len) { struct nb_config_cb *cb; struct lyd_node *root, *child; @@ -816,6 +998,21 @@ static int nb_candidate_validate_code(struct nb_context *context, return NB_OK; } +int nb_candidate_diff_and_validate_yang(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len) +{ + if (nb_candidate_validate_yang(candidate, errmsg, sizeof(errmsg_len)) + != NB_OK) + return NB_ERR_VALIDATION; + + RB_INIT(nb_config_cbs, changes); + nb_config_diff(running_config, candidate, changes); + + return NB_OK; +} + int nb_candidate_validate(struct nb_context *context, struct nb_config *candidate, char *errmsg, size_t errmsg_len) @@ -823,11 +1020,11 @@ int nb_candidate_validate(struct nb_context *context, struct nb_config_cbs changes; int ret; - if (nb_candidate_validate_yang(candidate, errmsg, errmsg_len) != NB_OK) - return NB_ERR_VALIDATION; + ret = nb_candidate_diff_and_validate_yang(context, candidate, &changes, + errmsg, errmsg_len); + if (ret != NB_OK) + return ret; - RB_INIT(nb_config_cbs, &changes); - nb_config_diff(running_config, candidate, &changes); ret = nb_candidate_validate_code(context, candidate, &changes, errmsg, errmsg_len); nb_config_diff_del_changes(&changes); @@ -839,12 +1036,14 @@ int nb_candidate_commit_prepare(struct nb_context context, struct nb_config *candidate, const char *comment, struct nb_transaction **transaction, + bool skip_validate, bool ignore_zero_change, char *errmsg, size_t errmsg_len) { struct nb_config_cbs changes; - if (nb_candidate_validate_yang(candidate, errmsg, errmsg_len) - != NB_OK) { + if (!skip_validate + && nb_candidate_validate_yang(candidate, errmsg, errmsg_len) + != NB_OK) { flog_warn(EC_LIB_NB_CANDIDATE_INVALID, "%s: failed to validate candidate configuration", __func__); @@ -853,15 +1052,17 @@ int nb_candidate_commit_prepare(struct nb_context context, RB_INIT(nb_config_cbs, &changes); nb_config_diff(running_config, candidate, &changes); - if (RB_EMPTY(nb_config_cbs, &changes)) { + if (!ignore_zero_change && RB_EMPTY(nb_config_cbs, &changes)) { snprintf( errmsg, errmsg_len, "No changes to apply were found during preparation phase"); return NB_ERR_NO_CHANGES; } - if (nb_candidate_validate_code(&context, candidate, &changes, errmsg, - errmsg_len) != NB_OK) { + if (!skip_validate + && nb_candidate_validate_code(&context, candidate, &changes, errmsg, + errmsg_len) + != NB_OK) { flog_warn(EC_LIB_NB_CANDIDATE_INVALID, "%s: failed to validate candidate configuration", __func__); @@ -869,8 +1070,12 @@ int nb_candidate_commit_prepare(struct nb_context context, return NB_ERR_VALIDATION; } - *transaction = nb_transaction_new(context, candidate, &changes, comment, - errmsg, errmsg_len); + /* + * Re-use an existing transaction if provided. Else allocate a new one. + */ + if (!*transaction) + *transaction = nb_transaction_new(context, candidate, &changes, + comment, errmsg, errmsg_len); if (*transaction == NULL) { flog_warn(EC_LIB_NB_TRANSACTION_CREATION_FAILED, "%s: failed to create transaction: %s", __func__, @@ -921,7 +1126,8 @@ int nb_candidate_commit(struct nb_context context, struct nb_config *candidate, int ret; ret = nb_candidate_commit_prepare(context, candidate, comment, - &transaction, errmsg, errmsg_len); + &transaction, false, false, errmsg, + errmsg_len); /* * Apply the changes if the preparation phase succeeded. Otherwise abort * the transaction. @@ -1015,6 +1221,8 @@ static int nb_callback_create(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_CREATE, dnode); args.context = context; @@ -1064,6 +1272,8 @@ static int nb_callback_modify(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_MODIFY, dnode); args.context = context; @@ -1113,6 +1323,8 @@ static int nb_callback_destroy(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_DESTROY, dnode); args.context = context; @@ -1156,6 +1368,8 @@ static int nb_callback_move(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_MOVE, dnode); args.context = context; @@ -1199,6 +1413,9 @@ static int nb_callback_pre_validate(struct nb_context *context, bool unexpected_error = false; int ret; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + nb_log_config_callback(NB_EV_VALIDATE, NB_OP_PRE_VALIDATE, dnode); args.dnode = dnode; @@ -1230,6 +1447,9 @@ static void nb_callback_apply_finish(struct nb_context *context, { struct nb_cb_apply_finish_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return; + nb_log_config_callback(NB_EV_APPLY, NB_OP_APPLY_FINISH, dnode); args.context = context; @@ -1245,6 +1465,9 @@ struct yang_data *nb_callback_get_elem(const struct nb_node *nb_node, { struct nb_cb_get_elem_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_elem): xpath [%s] list_entry [%p]", xpath, list_entry); @@ -1260,6 +1483,9 @@ const void *nb_callback_get_next(const struct nb_node *nb_node, { struct nb_cb_get_next_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_next): node [%s] parent_list_entry [%p] list_entry [%p]", nb_node->xpath, parent_list_entry, list_entry); @@ -1274,6 +1500,9 @@ int nb_callback_get_keys(const struct nb_node *nb_node, const void *list_entry, { struct nb_cb_get_keys_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_keys): node [%s] list_entry [%p]", nb_node->xpath, list_entry); @@ -1289,6 +1518,9 @@ const void *nb_callback_lookup_entry(const struct nb_node *nb_node, { struct nb_cb_lookup_entry_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (lookup_entry): node [%s] parent_list_entry [%p]", nb_node->xpath, parent_list_entry); @@ -1304,6 +1536,9 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath, { struct nb_cb_rpc_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + DEBUGD(&nb_dbg_cbs_rpc, "northbound RPC: %s", xpath); args.xpath = xpath; @@ -1330,6 +1565,9 @@ static int nb_callback_configuration(struct nb_context *context, union nb_resource *resource; int ret = NB_ERR; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NB_OK; + if (event == NB_EV_VALIDATE) resource = NULL; else @@ -1733,7 +1971,7 @@ static int nb_oper_data_iter_list(const struct nb_node *nb_node, /* Iterate over all list entries. */ do { const struct lysc_node_leaf *skey; - struct yang_list_keys list_keys; + struct yang_list_keys list_keys = {}; char xpath[XPATH_MAXLEN * 2]; int ret; @@ -2391,6 +2629,8 @@ const char *nb_client_name(enum nb_client client) return "Pcep"; case NB_CLIENT_MGMTD_SERVER: return "MGMTD Server"; + case NB_CLIENT_MGMTD_BE: + return "MGMT Backend"; case NB_CLIENT_NONE: return "None"; } @@ -2400,6 +2640,10 @@ const char *nb_client_name(enum nb_client client) static void nb_load_callbacks(const struct frr_yang_module_info *module) { + + if (module->ignore_cbs) + return; + for (size_t i = 0; module->nodes[i].xpath; i++) { struct nb_node *nb_node; uint32_t priority; @@ -2473,7 +2717,8 @@ void nb_init(struct thread_master *tm, /* Initialize the compiled nodes with northbound data */ for (size_t i = 0; i < nmodules; i++) { - yang_snodes_iterate(loaded[i]->info, nb_node_new_cb, 0, NULL); + yang_snodes_iterate(loaded[i]->info, nb_node_new_cb, 0, + (void *)modules[i]); nb_load_callbacks(modules[i]); } diff --git a/lib/northbound.h b/lib/northbound.h index 572d669dc1..b63b216b0d 100644 --- a/lib/northbound.h +++ b/lib/northbound.h @@ -22,6 +22,39 @@ extern "C" { struct vty; struct debug; +struct nb_yang_xpath_tag { + uint32_t ns; + uint32_t id; +}; + +struct nb_yang_value { + struct lyd_value value; + LY_DATA_TYPE value_type; + uint8_t value_flags; +}; + +struct nb_yang_xpath_elem { + struct nb_yang_xpath_tag tag; + struct nb_yang_value val; +}; + +#define NB_MAX_NUM_KEYS UINT8_MAX +#define NB_MAX_NUM_XPATH_TAGS UINT8_MAX + +struct nb_yang_xpath { + uint8_t length; + struct { + uint8_t num_keys; + struct nb_yang_xpath_elem keys[NB_MAX_NUM_KEYS]; + } tags[NB_MAX_NUM_XPATH_TAGS]; +}; + +#define NB_YANG_XPATH_KEY(__xpath, __indx1, __indx2) \ + ((__xpath->num_tags > __indx1) \ + && (__xpath->tags[__indx1].num_keys > __indx2) \ + ? &__xpath->tags[__indx1].keys[__indx2] \ + : NULL) + /* Northbound events. */ enum nb_event { /* @@ -564,6 +597,8 @@ struct nb_node { #define F_NB_NODE_CONFIG_ONLY 0x01 /* The YANG list doesn't contain key leafs. */ #define F_NB_NODE_KEYLESS_LIST 0x02 +/* Ignore callbacks for this node */ +#define F_NB_NODE_IGNORE_CBS 0x04 /* * HACK: old gcc versions (< 5.x) have a bug that prevents C99 flexible arrays @@ -576,6 +611,12 @@ struct frr_yang_module_info { /* YANG module name. */ const char *name; + /* + * Ignore callbacks for this module. Set this to true to + * load module without any callbacks. + */ + bool ignore_cbs; + /* Northbound callbacks. */ const struct { /* Data path of this YANG node. */ @@ -620,6 +661,7 @@ enum nb_client { NB_CLIENT_GRPC, NB_CLIENT_PCEP, NB_CLIENT_MGMTD_SERVER, + NB_CLIENT_MGMTD_BE, }; /* Northbound context. */ @@ -631,12 +673,6 @@ struct nb_context { const void *user; }; -/* Northbound configuration. */ -struct nb_config { - struct lyd_node *dnode; - uint32_t version; -}; - /* Northbound configuration callback. */ struct nb_config_cb { RB_ENTRY(nb_config_cb) entry; @@ -663,6 +699,13 @@ struct nb_transaction { struct nb_config_cbs changes; }; +/* Northbound configuration. */ +struct nb_config { + struct lyd_node *dnode; + uint32_t version; + struct nb_config_cbs cfg_chgs; +}; + /* Callback function used by nb_oper_data_iterate(). */ typedef int (*nb_oper_data_cb)(const struct lysc_node *snode, struct yang_translator *translator, @@ -832,6 +875,9 @@ extern int nb_candidate_edit(struct nb_config *candidate, const struct yang_data *previous, const struct yang_data *data); +extern void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, + struct nb_config_cbs *changes); + /* * Check if a candidate configuration is outdated and needs to be updated. * @@ -843,6 +889,30 @@ extern int nb_candidate_edit(struct nb_config *candidate, */ extern bool nb_candidate_needs_update(const struct nb_config *candidate); +extern void nb_candidate_edit_config_changes( + struct nb_config *candidate_config, struct nb_cfg_change cfg_changes[], + size_t num_cfg_changes, const char *xpath_base, const char *curr_xpath, + int xpath_index, char *err_buf, int err_bufsize, bool *error); + +extern void nb_config_diff_del_changes(struct nb_config_cbs *changes); + +extern int nb_candidate_diff_and_validate_yang(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len); + +extern void nb_config_diff(const struct nb_config *reference, + const struct nb_config *incremental, + struct nb_config_cbs *changes); + +extern int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, + size_t errmsg_len); + +extern int nb_candidate_validate_code(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len); + /* * Update a candidate configuration by rebasing the changes on top of the latest * running configuration. Resolve conflicts automatically by giving preference @@ -922,7 +992,9 @@ extern int nb_candidate_commit_prepare(struct nb_context context, struct nb_config *candidate, const char *comment, struct nb_transaction **transaction, - char *errmsg, size_t errmsg_len); + bool skip_validate, + bool ignore_zero_change, char *errmsg, + size_t errmsg_len); /* * Abort a previously created configuration transaction, releasing all resources diff --git a/lib/northbound_cli.c b/lib/northbound_cli.c index 27db2059bb..2ead14cc39 100644 --- a/lib/northbound_cli.c +++ b/lib/northbound_cli.c @@ -141,79 +141,21 @@ static int nb_cli_apply_changes_internal(struct vty *vty, bool clear_pending) { bool error = false; - - if (xpath_base == NULL) - xpath_base = ""; + char buf[BUFSIZ]; VTY_CHECK_XPATH; - /* Edit candidate configuration. */ - for (size_t i = 0; i < vty->num_cfg_changes; i++) { - struct nb_cfg_change *change = &vty->cfg_changes[i]; - struct nb_node *nb_node; - char xpath[XPATH_MAXLEN]; - struct yang_data *data; - int ret; - - /* Handle relative XPaths. */ - memset(xpath, 0, sizeof(xpath)); - if (vty->xpath_index > 0 - && (xpath_base[0] == '.' || change->xpath[0] == '.')) - strlcpy(xpath, VTY_CURR_XPATH, sizeof(xpath)); - if (xpath_base[0]) { - if (xpath_base[0] == '.') - strlcat(xpath, xpath_base + 1, sizeof(xpath)); - else - strlcat(xpath, xpath_base, sizeof(xpath)); - } - if (change->xpath[0] == '.') - strlcat(xpath, change->xpath + 1, sizeof(xpath)); - else - strlcpy(xpath, change->xpath, sizeof(xpath)); - - /* Find the northbound node associated to the data path. */ - nb_node = nb_node_find(xpath); - if (!nb_node) { - flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH, - "%s: unknown data path: %s", __func__, xpath); - error = true; - continue; - } - - /* If the value is not set, get the default if it exists. */ - if (change->value == NULL) - change->value = yang_snode_get_default(nb_node->snode); - data = yang_data_new(xpath, change->value); - - /* - * Ignore "not found" errors when editing the candidate - * configuration. - */ - ret = nb_candidate_edit(vty->candidate_config, nb_node, - change->operation, xpath, NULL, data); - yang_data_free(data); - if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { - flog_warn( - EC_LIB_NB_CANDIDATE_EDIT_ERROR, - "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", - __func__, nb_operation_name(change->operation), - xpath); - error = true; - continue; - } - } - + nb_candidate_edit_config_changes( + vty->candidate_config, vty->cfg_changes, vty->num_cfg_changes, + xpath_base, VTY_CURR_XPATH, vty->xpath_index, buf, sizeof(buf), + &error); if (error) { - char buf[BUFSIZ]; - /* * Failure to edit the candidate configuration should never * happen in practice, unless there's a bug in the code. When * that happens, log the error but otherwise ignore it. */ - vty_out(vty, "%% Failed to edit configuration.\n\n"); - vty_out(vty, "%s", - yang_print_errors(ly_native_ctx, buf, sizeof(buf))); + vty_out(vty, "%s", buf); } /* diff --git a/lib/northbound_confd.c b/lib/northbound_confd.c index 2b57ff2707..ee19568516 100644 --- a/lib/northbound_confd.c +++ b/lib/northbound_confd.c @@ -312,7 +312,8 @@ static void frr_confd_cdb_read_cb_prepare(int fd, int *subp, int reslen) transaction = NULL; context.client = NB_CLIENT_CONFD; ret = nb_candidate_commit_prepare(context, candidate, NULL, - &transaction, errmsg, sizeof(errmsg)); + &transaction, false, false, errmsg, + sizeof(errmsg)); if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) { enum confd_errcode errcode; diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 1459146eab..274a0ca45a 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -825,7 +825,8 @@ HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) grpc_debug("`-> Performing PREPARE"); ret = nb_candidate_commit_prepare( context, candidate->config, comment.c_str(), - &candidate->transaction, errmsg, sizeof(errmsg)); + &candidate->transaction, false, false, errmsg, + sizeof(errmsg)); break; case frr::CommitRequest::ABORT: grpc_debug("`-> Performing ABORT"); diff --git a/lib/northbound_sysrepo.c b/lib/northbound_sysrepo.c index 096414ff24..337fb690d1 100644 --- a/lib/northbound_sysrepo.c +++ b/lib/northbound_sysrepo.c @@ -269,7 +269,8 @@ static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session, * required to apply them. */ ret = nb_candidate_commit_prepare(context, candidate, NULL, - &transaction, errmsg, sizeof(errmsg)); + &transaction, false, false, errmsg, + sizeof(errmsg)); if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) flog_warn( EC_LIB_LIBSYSREPO, diff --git a/lib/subdir.am b/lib/subdir.am index eb7cbc4f4f..84a35417e5 100644 --- a/lib/subdir.am +++ b/lib/subdir.am @@ -64,6 +64,7 @@ lib_libfrr_la_SOURCES = \ lib/log_vty.c \ lib/md5.c \ lib/memory.c \ + lib/mgmt_be_client.c \ lib/mgmt_fe_client.c \ lib/mlag.c \ lib/module.c \ @@ -241,6 +242,7 @@ pkginclude_HEADERS += \ lib/md5.h \ lib/memory.h \ lib/mgmt.pb-c.h \ + lib/mgmt_be_client.h \ lib/mgmt_fe_client.h \ lib/mgmt_pb.h \ lib/module.h \ |
