]> git.puffer.fish Git - matthieu/frr.git/commitdiff
lib: adapt plugin to use new Sysrepo version
authorRenato Westphal <renato@opensourcerouting.org>
Wed, 19 Aug 2020 23:33:40 +0000 (20:33 -0300)
committerRenato Westphal <renato@opensourcerouting.org>
Thu, 20 Aug 2020 14:53:46 +0000 (11:53 -0300)
Sysrepo recently underwent a complete rewrite, where some substantial
architectural changes were made (the most important one being the
extinction of the sysrepod daemon). While most of the existing API
was preserved, quite a few backward-incompatible changes [1] were
introduced (mostly simplifications). This commit adapts our sysrepo
northbound plugin to those API changes in order for it to be compatible
with the latest Sysrepo version.

Additional notes:
* The old Sysrepo version is EOL and not supported anymore.
* The new Sysrepo version requires libyang 1.x.

Closes #6936

[1] https://github.com/sysrepo/sysrepo/blob/devel/CHANGES

Signed-off-by: Renato Westphal <renato@opensourcerouting.org>
lib/northbound_sysrepo.c
lib/yang.h

index 2209b19c1477ba4365b029c3fa9dea61a9248dd5..3dec685927795536fe84a2ac6247a623a96b264a 100644 (file)
@@ -37,13 +37,11 @@ DEFINE_MTYPE_STATIC(LIB, SYSREPO, "Sysrepo module")
 static struct debug nb_dbg_client_sysrepo = {0, "Northbound client: Sysrepo"};
 
 static struct thread_master *master;
-static struct list *sysrepo_threads;
 static sr_session_ctx_t *session;
 static sr_conn_ctx_t *connection;
 static struct nb_transaction *transaction;
 
 static int frr_sr_read_cb(struct thread *thread);
-static int frr_sr_write_cb(struct thread *thread);
 static int frr_sr_finish(void);
 
 /* Convert FRR YANG data value to sysrepo YANG data value. */
@@ -236,25 +234,23 @@ static int frr_sr_process_change(struct nb_config *candidate,
        return NB_OK;
 }
 
-static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
-                                         const char *module_name,
-                                         bool startup_config)
+static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session,
+                                          const char *module_name,
+                                          bool startup_config)
 {
        sr_change_iter_t *it;
        int ret;
        sr_change_oper_t sr_op;
        sr_val_t *sr_old_val, *sr_new_val;
-       char xpath[XPATH_MAXLEN];
        struct nb_context context = {};
        struct nb_config *candidate;
        char errmsg[BUFSIZ] = {0};
 
-       snprintf(xpath, sizeof(xpath), "/%s:*", module_name);
-       ret = sr_get_changes_iter(session, xpath, &it);
+       ret = sr_get_changes_iter(session, "//*", &it);
        if (ret != SR_ERR_OK) {
                flog_err(EC_LIB_LIBSYSREPO,
-                        "%s: sr_get_changes_iter() failed for xpath %s",
-                        __func__, xpath);
+                        "%s: sr_get_changes_iter() failed for \"%s\"",
+                        __func__, module_name);
                return ret;
        }
 
@@ -307,12 +303,14 @@ static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
                                __func__, nb_err_name(ret), errmsg);
        }
 
+       if (!transaction)
+               nb_config_free(candidate);
+
        /* Map northbound return code to sysrepo return code. */
        switch (ret) {
        case NB_OK:
                return SR_ERR_OK;
        case NB_ERR_NO_CHANGES:
-               nb_config_free(candidate);
                return SR_ERR_OK;
        case NB_ERR_LOCKED:
                return SR_ERR_LOCKED;
@@ -356,22 +354,23 @@ static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session,
 
 /* Callback for changes in the running configuration. */
 static int frr_sr_config_change_cb(sr_session_ctx_t *session,
-                                  const char *module_name,
-                                  sr_notif_event_t sr_ev, void *private_ctx)
+                                  const char *module_name, const char *xpath,
+                                  sr_event_t sr_ev, uint32_t request_id,
+                                  void *private_data)
 {
        switch (sr_ev) {
        case SR_EV_ENABLED:
-               return frr_sr_config_change_cb_verify(session, module_name,
-                                                     true);
-       case SR_EV_VERIFY:
-               return frr_sr_config_change_cb_verify(session, module_name,
-                                                     false);
-       case SR_EV_APPLY:
+               return frr_sr_config_change_cb_prepare(session, module_name,
+                                                      true);
+       case SR_EV_CHANGE:
+               return frr_sr_config_change_cb_prepare(session, module_name,
+                                                      false);
+       case SR_EV_DONE:
                return frr_sr_config_change_cb_apply(session, module_name);
        case SR_EV_ABORT:
                return frr_sr_config_change_cb_abort(session, module_name);
        default:
-               flog_err(EC_LIB_LIBSYSREPO, "%s: unknown sysrepo event: %u",
+               flog_err(EC_LIB_LIBSYSREPO, "%s: unexpected sysrepo event: %u",
                         __func__, sr_ev);
                return SR_ERR_INTERNAL;
        }
@@ -381,70 +380,49 @@ static int frr_sr_state_data_iter_cb(const struct lys_node *snode,
                                     struct yang_translator *translator,
                                     struct yang_data *data, void *arg)
 {
-       struct list *elements = arg;
-
-       listnode_add(elements, data);
+       struct lyd_node *dnode = arg;
+
+       ly_errno = 0;
+       dnode = lyd_new_path(dnode, ly_native_ctx, data->xpath, data->value, 0,
+                            LYD_PATH_OPT_UPDATE);
+       if (!dnode && ly_errno) {
+               flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed",
+                         __func__);
+               yang_data_free(data);
+               return NB_ERR;
+       }
 
+       yang_data_free(data);
        return NB_OK;
 }
 
 /* Callback for state retrieval. */
-static int frr_sr_state_cb(const char *xpath, sr_val_t **values,
-                          size_t *values_cnt, uint64_t request_id,
-                          const char *original_xpath, void *private_ctx)
+static int frr_sr_state_cb(sr_session_ctx_t *session, const char *module_name,
+                          const char *xpath, const char *request_xpath,
+                          uint32_t request_id, struct lyd_node **parent,
+                          void *private_ctx)
 {
-       struct list *elements;
-       struct yang_data *data;
-       struct listnode *node;
-       sr_val_t *v;
-       int ret, count, i = 0;
+       struct lyd_node *dnode;
 
-       elements = yang_data_list_new();
-       if (nb_oper_data_iterate(xpath, NULL, NB_OPER_DATA_ITER_NORECURSE,
-                                frr_sr_state_data_iter_cb, elements)
+       dnode = *parent;
+       if (nb_oper_data_iterate(request_xpath, NULL, 0,
+                                frr_sr_state_data_iter_cb, dnode)
            != NB_OK) {
                flog_warn(EC_LIB_NB_OPERATIONAL_DATA,
                          "%s: failed to obtain operational data [xpath %s]",
                          __func__, xpath);
-               goto exit;
-       }
-
-       if (list_isempty(elements))
-               goto exit;
-
-       count = listcount(elements);
-       ret = sr_new_values(count, &v);
-       if (ret != SR_ERR_OK) {
-               flog_err(EC_LIB_LIBSYSREPO, "%s: sr_new_values(): %s", __func__,
-                        sr_strerror(ret));
-               goto exit;
-       }
-
-       for (ALL_LIST_ELEMENTS_RO(elements, node, data)) {
-               if (yang_data_frr2sr(data, &v[i++]) != 0) {
-                       flog_err(EC_LIB_SYSREPO_DATA_CONVERT,
-                                "%s: failed to convert data to sysrepo format",
-                                __func__);
-               }
+               return SR_ERR_INTERNAL;
        }
 
-       *values = v;
-       *values_cnt = count;
-
-       list_delete(&elements);
-
-       return SR_ERR_OK;
-
-exit:
-       list_delete(&elements);
-       *values = NULL;
-       *values_cnt = 0;
+       *parent = dnode;
 
        return SR_ERR_OK;
 }
 
-static int frr_sr_config_rpc_cb(const char *xpath, const sr_val_t *sr_input,
-                               const size_t input_cnt, sr_val_t **sr_output,
+static int frr_sr_config_rpc_cb(sr_session_ctx_t *session, const char *xpath,
+                               const sr_val_t *sr_input,
+                               const size_t input_cnt, sr_event_t sr_ev,
+                               uint32_t request_id, sr_val_t **sr_output,
                                size_t *sr_output_cnt, void *private_ctx)
 {
        struct nb_node *nb_node;
@@ -551,8 +529,7 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
                }
        }
 
-       ret = sr_event_notif_send(session, xpath, values, values_cnt,
-                                 SR_EV_NOTIF_DEFAULT);
+       ret = sr_event_notif_send(session, xpath, values, values_cnt);
        if (ret != SR_ERR_OK) {
                flog_err(EC_LIB_LIBSYSREPO,
                         "%s: sr_event_notif_send() failed for xpath %s",
@@ -563,102 +540,13 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
        return NB_OK;
 }
 
-/* Code to integrate the sysrepo client into FRR main event loop. */
-struct sysrepo_thread {
-       struct thread *thread;
-       sr_fd_event_t event;
-       int fd;
-};
-
-static struct sysrepo_thread *frr_sr_fd_lookup(sr_fd_event_t event, int fd)
-{
-       struct sysrepo_thread *sr_thread;
-       struct listnode *node;
-
-       for (ALL_LIST_ELEMENTS_RO(sysrepo_threads, node, sr_thread)) {
-               if (sr_thread->event == event && sr_thread->fd == fd)
-                       return sr_thread;
-       }
-
-       return NULL;
-}
-
-static void frr_sr_fd_add(int event, int fd)
-{
-       struct sysrepo_thread *sr_thread;
-
-       if (frr_sr_fd_lookup(event, fd) != NULL)
-               return;
-
-       sr_thread = XCALLOC(MTYPE_SYSREPO, sizeof(*sr_thread));
-       sr_thread->event = event;
-       sr_thread->fd = fd;
-       listnode_add(sysrepo_threads, sr_thread);
-
-       switch (event) {
-       case SR_FD_INPUT_READY:
-               thread_add_read(master, frr_sr_read_cb, NULL, fd,
-                               &sr_thread->thread);
-               break;
-       case SR_FD_OUTPUT_READY:
-               thread_add_write(master, frr_sr_write_cb, NULL, fd,
-                                &sr_thread->thread);
-               break;
-       default:
-               return;
-       }
-}
-
-static void frr_sr_fd_free(struct sysrepo_thread *sr_thread)
-{
-       THREAD_OFF(sr_thread->thread);
-       XFREE(MTYPE_SYSREPO, sr_thread);
-}
-
-static void frr_sr_fd_del(int event, int fd)
-{
-       struct sysrepo_thread *sr_thread;
-
-       sr_thread = frr_sr_fd_lookup(event, fd);
-       if (!sr_thread)
-               return;
-
-       listnode_delete(sysrepo_threads, sr_thread);
-       frr_sr_fd_free(sr_thread);
-}
-
-static void frr_sr_fd_update(sr_fd_change_t *fd_change_set,
-                            size_t fd_change_set_cnt)
-{
-       for (size_t i = 0; i < fd_change_set_cnt; i++) {
-               int fd = fd_change_set[i].fd;
-               int event = fd_change_set[i].events;
-
-               if (event != SR_FD_INPUT_READY && event != SR_FD_OUTPUT_READY)
-                       continue;
-
-               switch (fd_change_set[i].action) {
-               case SR_FD_START_WATCHING:
-                       frr_sr_fd_add(event, fd);
-                       break;
-               case SR_FD_STOP_WATCHING:
-                       frr_sr_fd_del(event, fd);
-                       break;
-               default:
-                       break;
-               }
-       }
-}
-
 static int frr_sr_read_cb(struct thread *thread)
 {
+       sr_subscription_ctx_t *sr_subscription = THREAD_ARG(thread);
        int fd = THREAD_FD(thread);
-       sr_fd_change_t *fd_change_set = NULL;
-       size_t fd_change_set_cnt = 0;
        int ret;
 
-       ret = sr_fd_event_process(fd, SR_FD_INPUT_READY, &fd_change_set,
-                                 &fd_change_set_cnt);
+       ret = sr_process_events(sr_subscription, session, NULL);
        if (ret != SR_ERR_OK) {
                flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
                         __func__, sr_strerror(ret));
@@ -666,31 +554,7 @@ static int frr_sr_read_cb(struct thread *thread)
        }
 
        thread = NULL;
-       thread_add_read(master, frr_sr_read_cb, NULL, fd, &thread);
-
-       frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
-       free(fd_change_set);
-
-       return 0;
-}
-
-static int frr_sr_write_cb(struct thread *thread)
-{
-       int fd = THREAD_FD(thread);
-       sr_fd_change_t *fd_change_set = NULL;
-       size_t fd_change_set_cnt = 0;
-       int ret;
-
-       ret = sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &fd_change_set,
-                                 &fd_change_set_cnt);
-       if (ret != SR_ERR_OK) {
-               flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
-                        __func__, sr_strerror(ret));
-               return -1;
-       }
-
-       frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
-       free(fd_change_set);
+       thread_add_read(master, frr_sr_read_cb, sr_subscription, fd, &thread);
 
        return 0;
 }
@@ -700,8 +564,8 @@ static void frr_sr_subscribe_config(struct yang_module *module)
        int ret;
 
        ret = sr_module_change_subscribe(
-               session, module->name, frr_sr_config_change_cb, NULL, 0,
-               SR_SUBSCR_DEFAULT | SR_SUBSCR_EV_ENABLED,
+               session, module->name, NULL, frr_sr_config_change_cb, NULL, 0,
+               SR_SUBSCR_DEFAULT | SR_SUBSCR_ENABLED | SR_SUBSCR_NO_THREAD,
                &module->sr_subscription);
        if (ret != SR_ERR_OK)
                flog_err(EC_LIB_LIBSYSREPO, "sr_module_change_subscribe(): %s",
@@ -725,11 +589,11 @@ static int frr_sr_subscribe_state(const struct lys_node *snode, void *arg)
        DEBUGD(&nb_dbg_client_sysrepo, "%s: providing data to '%s'", __func__,
               nb_node->xpath);
 
-       ret = sr_dp_get_items_subscribe(
-               session, nb_node->xpath, frr_sr_state_cb, NULL,
-               SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
+       ret = sr_oper_get_items_subscribe(
+               session, snode->module->name, nb_node->xpath, frr_sr_state_cb,
+               NULL, SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
        if (ret != SR_ERR_OK)
-               flog_err(EC_LIB_LIBSYSREPO, "sr_dp_get_items_subscribe(): %s",
+               flog_err(EC_LIB_LIBSYSREPO, "sr_oper_get_items_subscribe(): %s",
                         sr_strerror(ret));
 
        return YANG_ITER_CONTINUE;
@@ -750,7 +614,7 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
               nb_node->xpath);
 
        ret = sr_rpc_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
-                              NULL, SR_SUBSCR_CTX_REUSE,
+                              NULL, 0, SR_SUBSCR_CTX_REUSE,
                               &module->sr_subscription);
        if (ret != SR_ERR_OK)
                flog_err(EC_LIB_LIBSYSREPO, "sr_rpc_subscribe(): %s",
@@ -759,30 +623,6 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
        return YANG_ITER_CONTINUE;
 }
 
-static int frr_sr_subscribe_action(const struct lys_node *snode, void *arg)
-{
-       struct yang_module *module = arg;
-       struct nb_node *nb_node;
-       int ret;
-
-       if (snode->nodetype != LYS_ACTION)
-               return YANG_ITER_CONTINUE;
-
-       nb_node = snode->priv;
-
-       DEBUGD(&nb_dbg_client_sysrepo, "%s: providing action to '%s'", __func__,
-              nb_node->xpath);
-
-       ret = sr_action_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
-                                 NULL, SR_SUBSCR_CTX_REUSE,
-                                 &module->sr_subscription);
-       if (ret != SR_ERR_OK)
-               flog_err(EC_LIB_LIBSYSREPO, "sr_action_subscribe(): %s",
-                        sr_strerror(ret));
-
-       return YANG_ITER_CONTINUE;
-}
-
 /* CLI commands. */
 DEFUN (debug_nb_sr,
        debug_nb_sr_cmd,
@@ -830,22 +670,13 @@ static void frr_sr_cli_init(void)
 }
 
 /* FRR's Sysrepo initialization. */
-static int frr_sr_init(const char *program_name)
+static int frr_sr_init(void)
 {
        struct yang_module *module;
-       int sysrepo_fd, ret;
-
-       sysrepo_threads = list_new();
-
-       ret = sr_fd_watcher_init(&sysrepo_fd, NULL);
-       if (ret != SR_ERR_OK) {
-               flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_fd_watcher_init(): %s",
-                        __func__, sr_strerror(ret));
-               goto cleanup;
-       }
+       int ret;
 
        /* Connect to Sysrepo. */
-       ret = sr_connect(program_name, SR_CONN_DEFAULT, &connection);
+       ret = sr_connect(SR_CONN_DEFAULT, &connection);
        if (ret != SR_ERR_OK) {
                flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_connect(): %s", __func__,
                         sr_strerror(ret));
@@ -853,8 +684,7 @@ static int frr_sr_init(const char *program_name)
        }
 
        /* Start session. */
-       ret = sr_session_start(connection, SR_DS_RUNNING, SR_SESS_DEFAULT,
-                              &session);
+       ret = sr_session_start(connection, SR_DS_RUNNING, &session);
        if (ret != SR_ERR_OK) {
                flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_session_start(): %s",
                         __func__, sr_strerror(ret));
@@ -863,19 +693,28 @@ static int frr_sr_init(const char *program_name)
 
        /* Perform subscriptions. */
        RB_FOREACH (module, yang_modules, &yang_modules) {
+               int event_pipe;
+
                frr_sr_subscribe_config(module);
                yang_snodes_iterate_module(module->info, frr_sr_subscribe_state,
                                           0, module);
                yang_snodes_iterate_module(module->info, frr_sr_subscribe_rpc,
                                           0, module);
-               yang_snodes_iterate_module(module->info,
-                                          frr_sr_subscribe_action, 0, module);
+
+               /* Watch subscriptions. */
+               ret = sr_get_event_pipe(module->sr_subscription, &event_pipe);
+               if (ret != SR_ERR_OK) {
+                       flog_err(EC_LIB_SYSREPO_INIT,
+                                "%s: sr_get_event_pipe(): %s", __func__,
+                                sr_strerror(ret));
+                       goto cleanup;
+               }
+               thread_add_read(master, frr_sr_read_cb, module->sr_subscription,
+                               event_pipe, &module->sr_thread);
        }
 
        hook_register(nb_notification_send, frr_sr_notification_send);
 
-       frr_sr_fd_add(SR_FD_INPUT_READY, sysrepo_fd);
-
        return 0;
 
 cleanup:
@@ -891,7 +730,8 @@ static int frr_sr_finish(void)
        RB_FOREACH (module, yang_modules, &yang_modules) {
                if (!module->sr_subscription)
                        continue;
-               sr_unsubscribe(session, module->sr_subscription);
+               sr_unsubscribe(module->sr_subscription);
+               THREAD_OFF(module->sr_thread);
        }
 
        if (session)
@@ -899,10 +739,6 @@ static int frr_sr_finish(void)
        if (connection)
                sr_disconnect(connection);
 
-       sysrepo_threads->del = (void (*)(void *))frr_sr_fd_free;
-       list_delete(&sysrepo_threads);
-       sr_fd_watcher_cleanup();
-
        return 0;
 }
 
@@ -910,7 +746,7 @@ static int frr_sr_module_late_init(struct thread_master *tm)
 {
        master = tm;
 
-       if (frr_sr_init(frr_get_progname()) < 0) {
+       if (frr_sr_init() < 0) {
                flog_err(EC_LIB_SYSREPO_INIT,
                         "failed to initialize the Sysrepo module");
                return -1;
index cc048c44e8a5f7a3b673a679b940051f3779c97c..94bbed233d55bd799fef0afe3e6378f1b92ebb7d 100644 (file)
@@ -63,6 +63,7 @@ struct yang_module {
 #endif
 #ifdef HAVE_SYSREPO
        sr_subscription_ctx_t *sr_subscription;
+       struct thread *sr_thread;
 #endif
 };
 RB_HEAD(yang_modules, yang_module);