diff options
Diffstat (limited to 'lib/northbound_sysrepo.c')
| -rw-r--r-- | lib/northbound_sysrepo.c | 379 |
1 files changed, 106 insertions, 273 deletions
diff --git a/lib/northbound_sysrepo.c b/lib/northbound_sysrepo.c index 500203173c..b5ef040a3f 100644 --- a/lib/northbound_sysrepo.c +++ b/lib/northbound_sysrepo.c @@ -37,13 +37,11 @@ DEFINE_MTYPE_STATIC(LIB, SYSREPO, "Sysrepo module") static struct debug nb_dbg_client_sysrepo = {0, "Northbound client: Sysrepo"}; static struct thread_master *master; -static struct list *sysrepo_threads; static sr_session_ctx_t *session; static sr_conn_ctx_t *connection; static struct nb_transaction *transaction; static int frr_sr_read_cb(struct thread *thread); -static int frr_sr_write_cb(struct thread *thread); static int frr_sr_finish(void); /* Convert FRR YANG data value to sysrepo YANG data value. */ @@ -176,6 +174,9 @@ static int frr_sr_process_change(struct nb_config *candidate, xpath = sr_data->xpath; + DEBUGD(&nb_dbg_client_sysrepo, "sysrepo: processing change [xpath %s]", + xpath); + /* Non-presence container - nothing to do. */ if (sr_data->type == SR_CONTAINER_T) return NB_OK; @@ -225,7 +226,7 @@ static int frr_sr_process_change(struct nb_config *candidate, ret = nb_candidate_edit(candidate, nb_node, nb_op, xpath, NULL, data); yang_data_free(data); - if (ret != NB_OK) { + if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { flog_warn( EC_LIB_NB_CANDIDATE_EDIT_ERROR, "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", @@ -236,25 +237,22 @@ static int frr_sr_process_change(struct nb_config *candidate, return NB_OK; } -static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session, - const char *module_name, - bool startup_config) +static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session, + const char *module_name) { sr_change_iter_t *it; int ret; sr_change_oper_t sr_op; sr_val_t *sr_old_val, *sr_new_val; - char xpath[XPATH_MAXLEN]; struct nb_context context = {}; struct nb_config *candidate; char errmsg[BUFSIZ] = {0}; - snprintf(xpath, sizeof(xpath), "/%s:*", module_name); - ret = sr_get_changes_iter(session, xpath, &it); + ret = sr_get_changes_iter(session, "//*", &it); if (ret != SR_ERR_OK) { flog_err(EC_LIB_LIBSYSREPO, - "%s: sr_get_changes_iter() failed for xpath %s", - __func__, xpath); + "%s: sr_get_changes_iter() failed for \"%s\"", + __func__, module_name); return ret; } @@ -279,40 +277,26 @@ static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session, transaction = NULL; context.client = NB_CLIENT_SYSREPO; - if (startup_config) { - /* - * sysrepod sends the entire startup configuration using a - * single event (SR_EV_ENABLED). This means we need to perform - * the full two-phase commit protocol in one go here. - */ - ret = nb_candidate_commit(&context, candidate, true, NULL, NULL, - errmsg, sizeof(errmsg)); - if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) - flog_warn( - EC_LIB_LIBSYSREPO, - "%s: failed to apply startup configuration: %s (%s)", - __func__, nb_err_name(ret), errmsg); - } else { - /* - * Validate the configuration changes and allocate all resources - * required to apply them. - */ - ret = nb_candidate_commit_prepare(&context, candidate, NULL, - &transaction, errmsg, - sizeof(errmsg)); - if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) - flog_warn( - EC_LIB_LIBSYSREPO, - "%s: failed to prepare configuration transaction: %s (%s)", - __func__, nb_err_name(ret), errmsg); - } + /* + * Validate the configuration changes and allocate all resources + * required to apply them. + */ + ret = nb_candidate_commit_prepare(&context, candidate, NULL, + &transaction, errmsg, sizeof(errmsg)); + if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) + flog_warn( + EC_LIB_LIBSYSREPO, + "%s: failed to prepare configuration transaction: %s (%s)", + __func__, nb_err_name(ret), errmsg); + + if (!transaction) + nb_config_free(candidate); /* Map northbound return code to sysrepo return code. */ switch (ret) { case NB_OK: return SR_ERR_OK; case NB_ERR_NO_CHANGES: - nb_config_free(candidate); return SR_ERR_OK; case NB_ERR_LOCKED: return SR_ERR_LOCKED; @@ -329,8 +313,10 @@ static int frr_sr_config_change_cb_apply(sr_session_ctx_t *session, /* Apply the transaction. */ if (transaction) { struct nb_config *candidate = transaction->config; + char errmsg[BUFSIZ] = {0}; - nb_candidate_commit_apply(transaction, true, NULL); + nb_candidate_commit_apply(transaction, true, NULL, errmsg, + sizeof(errmsg)); nb_config_free(candidate); } @@ -343,8 +329,9 @@ static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session, /* Abort the transaction. */ if (transaction) { struct nb_config *candidate = transaction->config; + char errmsg[BUFSIZ] = {0}; - nb_candidate_commit_abort(transaction); + nb_candidate_commit_abort(transaction, errmsg, sizeof(errmsg)); nb_config_free(candidate); } @@ -353,22 +340,20 @@ static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session, /* Callback for changes in the running configuration. */ static int frr_sr_config_change_cb(sr_session_ctx_t *session, - const char *module_name, - sr_notif_event_t sr_ev, void *private_ctx) + const char *module_name, const char *xpath, + sr_event_t sr_ev, uint32_t request_id, + void *private_data) { switch (sr_ev) { case SR_EV_ENABLED: - return frr_sr_config_change_cb_verify(session, module_name, - true); - case SR_EV_VERIFY: - return frr_sr_config_change_cb_verify(session, module_name, - false); - case SR_EV_APPLY: + case SR_EV_CHANGE: + return frr_sr_config_change_cb_prepare(session, module_name); + case SR_EV_DONE: return frr_sr_config_change_cb_apply(session, module_name); case SR_EV_ABORT: return frr_sr_config_change_cb_abort(session, module_name); default: - flog_err(EC_LIB_LIBSYSREPO, "%s: unknown sysrepo event: %u", + flog_err(EC_LIB_LIBSYSREPO, "%s: unexpected sysrepo event: %u", __func__, sr_ev); return SR_ERR_INTERNAL; } @@ -378,70 +363,49 @@ static int frr_sr_state_data_iter_cb(const struct lys_node *snode, struct yang_translator *translator, struct yang_data *data, void *arg) { - struct list *elements = arg; - - listnode_add(elements, data); + struct lyd_node *dnode = arg; + + ly_errno = 0; + dnode = lyd_new_path(dnode, ly_native_ctx, data->xpath, data->value, 0, + LYD_PATH_OPT_UPDATE); + if (!dnode && ly_errno) { + flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed", + __func__); + yang_data_free(data); + return NB_ERR; + } + yang_data_free(data); return NB_OK; } /* Callback for state retrieval. */ -static int frr_sr_state_cb(const char *xpath, sr_val_t **values, - size_t *values_cnt, uint64_t request_id, - const char *original_xpath, void *private_ctx) +static int frr_sr_state_cb(sr_session_ctx_t *session, const char *module_name, + const char *xpath, const char *request_xpath, + uint32_t request_id, struct lyd_node **parent, + void *private_ctx) { - struct list *elements; - struct yang_data *data; - struct listnode *node; - sr_val_t *v; - int ret, count, i = 0; + struct lyd_node *dnode; - elements = yang_data_list_new(); - if (nb_oper_data_iterate(xpath, NULL, NB_OPER_DATA_ITER_NORECURSE, - frr_sr_state_data_iter_cb, elements) + dnode = *parent; + if (nb_oper_data_iterate(request_xpath, NULL, 0, + frr_sr_state_data_iter_cb, dnode) != NB_OK) { flog_warn(EC_LIB_NB_OPERATIONAL_DATA, "%s: failed to obtain operational data [xpath %s]", __func__, xpath); - goto exit; - } - - if (list_isempty(elements)) - goto exit; - - count = listcount(elements); - ret = sr_new_values(count, &v); - if (ret != SR_ERR_OK) { - flog_err(EC_LIB_LIBSYSREPO, "%s: sr_new_values(): %s", __func__, - sr_strerror(ret)); - goto exit; - } - - for (ALL_LIST_ELEMENTS_RO(elements, node, data)) { - if (yang_data_frr2sr(data, &v[i++]) != 0) { - flog_err(EC_LIB_SYSREPO_DATA_CONVERT, - "%s: failed to convert data to sysrepo format", - __func__); - } + return SR_ERR_INTERNAL; } - *values = v; - *values_cnt = count; - - list_delete(&elements); - - return SR_ERR_OK; - -exit: - list_delete(&elements); - *values = NULL; - *values_cnt = 0; + *parent = dnode; return SR_ERR_OK; } -static int frr_sr_config_rpc_cb(const char *xpath, const sr_val_t *sr_input, - const size_t input_cnt, sr_val_t **sr_output, +static int frr_sr_config_rpc_cb(sr_session_ctx_t *session, const char *xpath, + const sr_val_t *sr_input, + const size_t input_cnt, sr_event_t sr_ev, + uint32_t request_id, sr_val_t **sr_output, size_t *sr_output_cnt, void *private_ctx) { struct nb_node *nb_node; @@ -548,8 +512,7 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments) } } - ret = sr_event_notif_send(session, xpath, values, values_cnt, - SR_EV_NOTIF_DEFAULT); + ret = sr_event_notif_send(session, xpath, values, values_cnt); if (ret != SR_ERR_OK) { flog_err(EC_LIB_LIBSYSREPO, "%s: sr_event_notif_send() failed for xpath %s", @@ -560,102 +523,13 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments) return NB_OK; } -/* Code to integrate the sysrepo client into FRR main event loop. */ -struct sysrepo_thread { - struct thread *thread; - sr_fd_event_t event; - int fd; -}; - -static struct sysrepo_thread *frr_sr_fd_lookup(sr_fd_event_t event, int fd) -{ - struct sysrepo_thread *sr_thread; - struct listnode *node; - - for (ALL_LIST_ELEMENTS_RO(sysrepo_threads, node, sr_thread)) { - if (sr_thread->event == event && sr_thread->fd == fd) - return sr_thread; - } - - return NULL; -} - -static void frr_sr_fd_add(int event, int fd) -{ - struct sysrepo_thread *sr_thread; - - if (frr_sr_fd_lookup(event, fd) != NULL) - return; - - sr_thread = XCALLOC(MTYPE_SYSREPO, sizeof(*sr_thread)); - sr_thread->event = event; - sr_thread->fd = fd; - listnode_add(sysrepo_threads, sr_thread); - - switch (event) { - case SR_FD_INPUT_READY: - thread_add_read(master, frr_sr_read_cb, NULL, fd, - &sr_thread->thread); - break; - case SR_FD_OUTPUT_READY: - thread_add_write(master, frr_sr_write_cb, NULL, fd, - &sr_thread->thread); - break; - default: - return; - } -} - -static void frr_sr_fd_free(struct sysrepo_thread *sr_thread) -{ - THREAD_OFF(sr_thread->thread); - XFREE(MTYPE_SYSREPO, sr_thread); -} - -static void frr_sr_fd_del(int event, int fd) -{ - struct sysrepo_thread *sr_thread; - - sr_thread = frr_sr_fd_lookup(event, fd); - if (!sr_thread) - return; - - listnode_delete(sysrepo_threads, sr_thread); - frr_sr_fd_free(sr_thread); -} - -static void frr_sr_fd_update(sr_fd_change_t *fd_change_set, - size_t fd_change_set_cnt) -{ - for (size_t i = 0; i < fd_change_set_cnt; i++) { - int fd = fd_change_set[i].fd; - int event = fd_change_set[i].events; - - if (event != SR_FD_INPUT_READY && event != SR_FD_OUTPUT_READY) - continue; - - switch (fd_change_set[i].action) { - case SR_FD_START_WATCHING: - frr_sr_fd_add(event, fd); - break; - case SR_FD_STOP_WATCHING: - frr_sr_fd_del(event, fd); - break; - default: - break; - } - } -} - static int frr_sr_read_cb(struct thread *thread) { + sr_subscription_ctx_t *sr_subscription = THREAD_ARG(thread); int fd = THREAD_FD(thread); - sr_fd_change_t *fd_change_set = NULL; - size_t fd_change_set_cnt = 0; int ret; - ret = sr_fd_event_process(fd, SR_FD_INPUT_READY, &fd_change_set, - &fd_change_set_cnt); + ret = sr_process_events(sr_subscription, session, NULL); if (ret != SR_ERR_OK) { flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s", __func__, sr_strerror(ret)); @@ -663,31 +537,7 @@ static int frr_sr_read_cb(struct thread *thread) } thread = NULL; - thread_add_read(master, frr_sr_read_cb, NULL, fd, &thread); - - frr_sr_fd_update(fd_change_set, fd_change_set_cnt); - free(fd_change_set); - - return 0; -} - -static int frr_sr_write_cb(struct thread *thread) -{ - int fd = THREAD_FD(thread); - sr_fd_change_t *fd_change_set = NULL; - size_t fd_change_set_cnt = 0; - int ret; - - ret = sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &fd_change_set, - &fd_change_set_cnt); - if (ret != SR_ERR_OK) { - flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s", - __func__, sr_strerror(ret)); - return -1; - } - - frr_sr_fd_update(fd_change_set, fd_change_set_cnt); - free(fd_change_set); + thread_add_read(master, frr_sr_read_cb, sr_subscription, fd, &thread); return 0; } @@ -696,9 +546,13 @@ static void frr_sr_subscribe_config(struct yang_module *module) { int ret; + DEBUGD(&nb_dbg_client_sysrepo, + "sysrepo: subscribing for configuration changes made in the '%s' module", + module->name); + ret = sr_module_change_subscribe( - session, module->name, frr_sr_config_change_cb, NULL, 0, - SR_SUBSCR_DEFAULT | SR_SUBSCR_EV_ENABLED, + session, module->name, NULL, frr_sr_config_change_cb, NULL, 0, + SR_SUBSCR_DEFAULT | SR_SUBSCR_ENABLED | SR_SUBSCR_NO_THREAD, &module->sr_subscription); if (ret != SR_ERR_OK) flog_err(EC_LIB_LIBSYSREPO, "sr_module_change_subscribe(): %s", @@ -719,14 +573,14 @@ static int frr_sr_subscribe_state(const struct lys_node *snode, void *arg) nb_node = snode->priv; - DEBUGD(&nb_dbg_client_sysrepo, "%s: providing data to '%s'", __func__, + DEBUGD(&nb_dbg_client_sysrepo, "sysrepo: providing data to '%s'", nb_node->xpath); - ret = sr_dp_get_items_subscribe( - session, nb_node->xpath, frr_sr_state_cb, NULL, - SR_SUBSCR_CTX_REUSE, &module->sr_subscription); + ret = sr_oper_get_items_subscribe( + session, snode->module->name, nb_node->xpath, frr_sr_state_cb, + NULL, SR_SUBSCR_CTX_REUSE, &module->sr_subscription); if (ret != SR_ERR_OK) - flog_err(EC_LIB_LIBSYSREPO, "sr_dp_get_items_subscribe(): %s", + flog_err(EC_LIB_LIBSYSREPO, "sr_oper_get_items_subscribe(): %s", sr_strerror(ret)); return YANG_ITER_CONTINUE; @@ -743,11 +597,11 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg) nb_node = snode->priv; - DEBUGD(&nb_dbg_client_sysrepo, "%s: providing RPC to '%s'", __func__, + DEBUGD(&nb_dbg_client_sysrepo, "sysrepo: providing RPC to '%s'", nb_node->xpath); ret = sr_rpc_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb, - NULL, SR_SUBSCR_CTX_REUSE, + NULL, 0, SR_SUBSCR_CTX_REUSE, &module->sr_subscription); if (ret != SR_ERR_OK) flog_err(EC_LIB_LIBSYSREPO, "sr_rpc_subscribe(): %s", @@ -756,30 +610,6 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg) return YANG_ITER_CONTINUE; } -static int frr_sr_subscribe_action(const struct lys_node *snode, void *arg) -{ - struct yang_module *module = arg; - struct nb_node *nb_node; - int ret; - - if (snode->nodetype != LYS_ACTION) - return YANG_ITER_CONTINUE; - - nb_node = snode->priv; - - DEBUGD(&nb_dbg_client_sysrepo, "%s: providing action to '%s'", __func__, - nb_node->xpath); - - ret = sr_action_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb, - NULL, SR_SUBSCR_CTX_REUSE, - &module->sr_subscription); - if (ret != SR_ERR_OK) - flog_err(EC_LIB_LIBSYSREPO, "sr_action_subscribe(): %s", - sr_strerror(ret)); - - return YANG_ITER_CONTINUE; -} - /* CLI commands. */ DEFUN (debug_nb_sr, debug_nb_sr_cmd, @@ -827,22 +657,13 @@ static void frr_sr_cli_init(void) } /* FRR's Sysrepo initialization. */ -static int frr_sr_init(const char *program_name) +static int frr_sr_init(void) { struct yang_module *module; - int sysrepo_fd, ret; - - sysrepo_threads = list_new(); - - ret = sr_fd_watcher_init(&sysrepo_fd, NULL); - if (ret != SR_ERR_OK) { - flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_fd_watcher_init(): %s", - __func__, sr_strerror(ret)); - goto cleanup; - } + int ret; /* Connect to Sysrepo. */ - ret = sr_connect(program_name, SR_CONN_DEFAULT, &connection); + ret = sr_connect(SR_CONN_DEFAULT, &connection); if (ret != SR_ERR_OK) { flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_connect(): %s", __func__, sr_strerror(ret)); @@ -850,8 +671,7 @@ static int frr_sr_init(const char *program_name) } /* Start session. */ - ret = sr_session_start(connection, SR_DS_RUNNING, SR_SESS_DEFAULT, - &session); + ret = sr_session_start(connection, SR_DS_RUNNING, &session); if (ret != SR_ERR_OK) { flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_session_start(): %s", __func__, sr_strerror(ret)); @@ -860,19 +680,28 @@ static int frr_sr_init(const char *program_name) /* Perform subscriptions. */ RB_FOREACH (module, yang_modules, &yang_modules) { + int event_pipe; + frr_sr_subscribe_config(module); yang_snodes_iterate_module(module->info, frr_sr_subscribe_state, 0, module); yang_snodes_iterate_module(module->info, frr_sr_subscribe_rpc, 0, module); - yang_snodes_iterate_module(module->info, - frr_sr_subscribe_action, 0, module); + + /* Watch subscriptions. */ + ret = sr_get_event_pipe(module->sr_subscription, &event_pipe); + if (ret != SR_ERR_OK) { + flog_err(EC_LIB_SYSREPO_INIT, + "%s: sr_get_event_pipe(): %s", __func__, + sr_strerror(ret)); + goto cleanup; + } + thread_add_read(master, frr_sr_read_cb, module->sr_subscription, + event_pipe, &module->sr_thread); } hook_register(nb_notification_send, frr_sr_notification_send); - frr_sr_fd_add(SR_FD_INPUT_READY, sysrepo_fd); - return 0; cleanup: @@ -888,7 +717,8 @@ static int frr_sr_finish(void) RB_FOREACH (module, yang_modules, &yang_modules) { if (!module->sr_subscription) continue; - sr_unsubscribe(session, module->sr_subscription); + sr_unsubscribe(module->sr_subscription); + THREAD_OFF(module->sr_thread); } if (session) @@ -896,24 +726,26 @@ static int frr_sr_finish(void) if (connection) sr_disconnect(connection); - sysrepo_threads->del = (void (*)(void *))frr_sr_fd_free; - list_delete(&sysrepo_threads); - sr_fd_watcher_cleanup(); - return 0; } -static int frr_sr_module_late_init(struct thread_master *tm) +static int frr_sr_module_very_late_init(struct thread_master *tm) { master = tm; - if (frr_sr_init(frr_get_progname()) < 0) { + if (frr_sr_init() < 0) { flog_err(EC_LIB_SYSREPO_INIT, "failed to initialize the Sysrepo module"); return -1; } hook_register(frr_fini, frr_sr_finish); + + return 0; +} + +static int frr_sr_module_late_init(struct thread_master *tm) +{ frr_sr_cli_init(); return 0; @@ -922,6 +754,7 @@ static int frr_sr_module_late_init(struct thread_master *tm) static int frr_sr_module_init(void) { hook_register(frr_late_init, frr_sr_module_late_init); + hook_register(frr_very_late_init, frr_sr_module_very_late_init); return 0; } |
