]> git.puffer.fish Git - mirror/frr.git/commitdiff
lib: notify on datastore (oper-state) changes
authorChristian Hopps <chopps@labn.net>
Sat, 14 Dec 2024 23:26:49 +0000 (18:26 -0500)
committerChristian Hopps <chopps@labn.net>
Tue, 14 Jan 2025 04:40:52 +0000 (23:40 -0500)
Signed-off-by: Christian Hopps <chopps@labn.net>
lib/northbound.c
lib/northbound.h
lib/northbound_notif.c
lib/northbound_oper.c

index c67ed924a95a5c976ba9f6d73d9d76f1f09c7ac4..418cb246f62992f882cc1288d663d0761eea3f5e 100644 (file)
@@ -2754,10 +2754,15 @@ void nb_init(struct event_loop *tm,
 
        /* Initialize oper-state */
        nb_oper_init(tm);
+
+       /* Initialize notification-state */
+       nb_notif_init(tm);
 }
 
 void nb_terminate(void)
 {
+       nb_notif_terminate();
+
        nb_oper_terminate();
 
        /* Terminate the northbound CLI. */
index e3f96c7bc7b6d5f2dfea22a26b794c48b4bd6532..b65cc100036633c16c4888d3aa166e6a69b2f500 100644 (file)
@@ -1512,6 +1512,15 @@ extern void nb_oper_cancel_walk(void *walk);
  */
 extern void nb_oper_cancel_all_walks(void);
 
+/**
+ * nb_oper_walk_finish_arg() - return the finish arg for this walk
+ */
+extern void *nb_oper_walk_finish_arg(void *walk);
+/**
+ * nb_oper_walk_cb_arg() - return the callback arg for this walk
+ */
+extern void *nb_oper_walk_cb_arg(void *walk);
+
 /*
  * Validate if the northbound callback operation is valid for the given node.
  *
@@ -1806,6 +1815,9 @@ extern struct lyd_node *nb_op_updatef(struct lyd_node *tree, const char *path, c
 extern struct lyd_node *nb_op_vupdatef(struct lyd_node *tree, const char *path, const char *val_fmt,
                                       va_list ap);
 
+extern void nb_notif_init(struct event_loop *loop);
+extern void nb_notif_terminate(void);
+
 #ifdef __cplusplus
 }
 #endif
index 42aaa092b8cd08801a2ab5030293aa32df974371..c64771ebedd4430bd80139624cc61d651dd25747 100644 (file)
  */
 #include <zebra.h>
 #include "debug.h"
+#include "lib_errors.h"
 #include "typesafe.h"
 #include "northbound.h"
+#include "mgmt_be_client.h"
 
 #define __dbg(fmt, ...)            DEBUGD(&nb_dbg_notif, "NB_OP_CHANGE: %s: " fmt, __func__, ##__VA_ARGS__)
 #define __log_err(fmt, ...) zlog_err("NB_OP_CHANGE: %s: ERROR: " fmt, __func__, ##__VA_ARGS__)
 
+#define NB_NOTIF_TIMER_MSEC (10) /* 10msec */
+
+/*
+ * ADDS:
+ * - Less specific:
+ *   - Any new add will cause more specific pending adds to be dropped and equal
+ *     or more specific deletes to be dropped.
+ * - More specific:
+ *   - Ignore any new add that is the same or more specific than an existing add.
+ *   - A new add that is more specific than a delete should change the delete
+ *     into an add query (since adds are reported as a replace).
+ *
+ * DELETES:
+ * - Less specific:
+ *   - Any new delete will cause more specific pending deletes to be dropped and
+ *     equal or more specific adds to be dropped.
+ * - More specific:
+ *   - Ignore new deletes that are the same or more specific than existing
+ *     deletes.
+ *   - A new delete that is more specific than an add can be dropped since we
+ *     use replacement methodology for the add.
+ *
+ * One thing we have to pay close attention to is that the state is going to be
+ * queried when the notification sent, not when we are told of the change.
+ */
+
+DEFINE_MTYPE_STATIC(LIB, OP_CHANGE, "NB Oper Change");
+DEFINE_MTYPE_STATIC(LIB, OP_CHANGES_GROUP, "NB Oper Changes Group");
+DEFINE_MTYPE_STATIC(LIB, NB_NOTIF_WALK_ARGS, "NB Notify Oper Walk");
+
+struct op_change {
+       RB_ENTRY(op_change) link;
+       char path[];
+};
+
+/*
+ * RB tree for op_change
+ */
+static int op_change_cmp(const struct op_change *e1, const struct op_change *e2);
+RB_HEAD(op_changes, op_change);
+RB_PROTOTYPE(op_changes, op_change, link, op_change_cmp)
+RB_GENERATE(op_changes, op_change, link, op_change_cmp)
+
+struct op_changes nb_notif_adds = RB_INITIALIZER(&nb_notif_adds);
+struct op_changes nb_notif_dels = RB_INITIALIZER(&nb_notif_dels);
+struct event_loop *nb_notif_master;
+struct event *nb_notif_timer;
+void *nb_notif_walk;
+
+/*
+ * We maintain a queue of change lists one entry per query and notification send
+ * action
+ */
+PREDECL_LIST(op_changes_queue);
+struct op_changes_group {
+       struct op_changes_queue_item item;
+       struct op_changes adds;
+       struct op_changes dels;
+       struct op_changes *cur_changes; /* used when walking */
+       struct op_change *cur_change;   /* "    "     " */
+};
+
+DECLARE_LIST(op_changes_queue, struct op_changes_group, item);
+static struct op_changes_queue_head op_changes_queue;
+
+struct nb_notif_walk_args {
+       struct op_changes_group *group;
+       struct lyd_node *tree;
+};
+
+static void nb_notif_set_walk_timer(void);
+
+
+static int pathncmp(const char *s1, const char *s2, size_t n)
+{
+       size_t i = 0;
+
+       while (i < n && *s1 && *s2) {
+               char c1 = *s1;
+               char c2 = *s2;
+
+               if ((c1 == '\'' && c2 == '\"') || (c1 == '\"' && c2 == '\'')) {
+                       s1++;
+                       s2++;
+                       i++;
+                       continue;
+               }
+               if (c1 != c2)
+                       return (unsigned char)c1 - (unsigned char)c2;
+               s1++;
+               s2++;
+               i++;
+       }
+       if (i < n)
+               return (unsigned char)*s1 - (unsigned char)*s2;
+       return 0;
+}
+
+static int pathcmp(const char *s1, const char *s2)
+{
+       while (*s1 && *s2) {
+               char c1 = *s1;
+               char c2 = *s2;
+
+               if ((c1 == '\'' && c2 == '\"') || (c1 == '\"' && c2 == '\'')) {
+                       s1++;
+                       s2++;
+                       continue;
+               }
+               if (c1 != c2)
+                       return (unsigned char)c1 - (unsigned char)c2;
+               s1++;
+               s2++;
+       }
+       return (unsigned char)*s1 - (unsigned char)*s2;
+}
+
+
+static int op_change_cmp(const struct op_change *e1, const struct op_change *e2)
+{
+       return pathcmp(e1->path, e2->path);
+}
+
+static struct op_change *op_change_alloc(const char *path)
+{
+       struct op_change *note;
+       size_t ssize = strlen(path) + 1;
+
+       note = XMALLOC(MTYPE_OP_CHANGE, sizeof(*note) + ssize);
+       memset(note, 0, sizeof(*note));
+       strlcpy(note->path, path, ssize);
+
+       return note;
+}
+
+static void op_change_free(struct op_change *note)
+{
+       XFREE(MTYPE_OP_CHANGE, note);
+}
+
+/**
+ * op_changes_group_push() - Save the current set of changes on the queue.
+ *
+ * This function will save the current set of changes on the queue and
+ * initialize a new set of changes.
+ */
+static void op_changes_group_push(void)
+{
+       struct op_changes_group *changes;
+
+       if (RB_EMPTY(op_changes, &nb_notif_adds) && RB_EMPTY(op_changes, &nb_notif_dels))
+               return;
+
+       __dbg("pushing current oper changes onto queue");
+
+       changes = XCALLOC(MTYPE_OP_CHANGES_GROUP, sizeof(*changes));
+       changes->adds = nb_notif_adds;
+       changes->dels = nb_notif_dels;
+       op_changes_queue_add_tail(&op_changes_queue, changes);
+
+       RB_INIT(op_changes, &nb_notif_adds);
+       RB_INIT(op_changes, &nb_notif_dels);
+}
+
+static void op_changes_group_free(struct op_changes_group *group)
+{
+       struct op_change *e, *next;
+
+       RB_FOREACH_SAFE (e, op_changes, &group->adds, next) {
+               RB_REMOVE(op_changes, &group->adds, e);
+               op_change_free(e);
+       }
+       RB_FOREACH_SAFE (e, op_changes, &group->dels, next) {
+               RB_REMOVE(op_changes, &group->dels, e);
+               op_change_free(e);
+       }
+       XFREE(MTYPE_OP_CHANGES_GROUP, group);
+}
+
+static struct op_change *__find_less_specific(struct op_changes *head, struct op_change *note)
+{
+       struct op_change *e;
+       size_t plen;
+
+       /*
+        * RB_NFIND finds equal or greater (more specific) than the key,
+        * so the previous node will be a less specific or no match that
+        * sorts earlier. We want to find when we are a more specific
+        * match.
+        */
+       e = RB_NFIND(op_changes, head, note);
+       if (e)
+               e = RB_PREV(op_changes, e);
+       else
+               e = RB_MAX(op_changes, head);
+       if (!e)
+               return NULL;
+       plen = strlen(e->path);
+       if (pathncmp(e->path, note->path, plen))
+               return NULL;
+       /* equal would have been returned from RB_NFIND() then we went RB_PREV */
+       assert(strlen(note->path) != plen);
+       return e;
+}
+
+static void __drop_eq_or_more_specific(struct op_changes *head, const char *path, int plen,
+                                      struct op_change *next)
+{
+       struct op_change *e;
+
+       for (e = next; e != NULL; e = next) {
+               /* if the prefix no longer matches we are done */
+               if (pathncmp(path, e->path, plen))
+                       break;
+               __dbg("dropping more specific %s: %s", head == &nb_notif_adds ? "add" : "delete",
+                     e->path);
+               next = RB_NEXT(op_changes, e);
+               RB_REMOVE(op_changes, head, e);
+               op_change_free(e);
+       }
+}
+
+static void __op_change_add_del(const char *path, struct op_changes *this_head,
+                               struct op_changes *other_head)
+{
+       /* find out if this has been subsumed or will subsume */
+
+       const char *op = this_head == &nb_notif_adds ? "add" : "delete";
+       struct op_change *note = op_change_alloc(path);
+       struct op_change *next, *e;
+       int plen;
+
+       __dbg("processing oper %s change path: %s", op, path);
+
+       /*
+        * See if we are already covered by a more general `op`.
+        */
+       e = __find_less_specific(this_head, note);
+       if (e) {
+               __dbg("%s path already covered by: %s", op, e->path);
+               op_change_free(note);
+               return;
+       }
+
+       /*
+        * Handle having a less-specific `other op`.
+        */
+       e = __find_less_specific(other_head, note);
+       if (e) {
+               if (this_head == &nb_notif_dels) {
+                       /*
+                        * If we have a less-specific add then drop this
+                        * more-specific delete as the add-replace will remove
+                        * this missing state.
+                        */
+                       __dbg("delete path already covered add-replace: %s", e->path);
+               } else {
+                       /*
+                        * If we have a less-specific delete, convert the delete
+                        * to an add, and drop this more-specific add. The new
+                        * less-specific add will pick up the more specific add
+                        * during the walk and as adds are processed as replaces
+                        * any other existing state that was to be deleted will
+                        * still be deleted (unless it also returns) by the replace.
+                        */
+                       __dbg("add covered, converting covering delete to add-replace: %s", e->path);
+                       RB_REMOVE(op_changes, other_head, e);
+                       __op_change_add_del(e->path, &nb_notif_adds, &nb_notif_dels);
+                       op_change_free(e);
+               }
+               op_change_free(note);
+               return;
+       }
+
+       e = RB_INSERT(op_changes, this_head, note);
+       if (e) {
+               __dbg("path already in %s tree: %s", op, path);
+               op_change_free(note);
+               return;
+       }
+
+       __dbg("scanning for subsumed or subsuming: %s", path);
+
+       plen = strlen(path);
+
+       next = RB_NEXT(op_changes, note);
+       __drop_eq_or_more_specific(this_head, path, plen, next);
+
+       /* Drop exact match or more specific `other op` */
+       next = RB_NFIND(op_changes, other_head, note);
+       __drop_eq_or_more_specific(other_head, path, plen, next);
+
+       nb_notif_set_walk_timer();
+}
+
 static void nb_notif_add(const char *path)
 {
+       __op_change_add_del(path, &nb_notif_adds, &nb_notif_dels);
 }
 
 
 static void nb_notif_delete(const char *path)
 {
+       __op_change_add_del(path, &nb_notif_dels, &nb_notif_adds);
 }
 
 struct lyd_node *nb_op_update(struct lyd_node *tree, const char *path, const char *value)
@@ -27,16 +326,14 @@ struct lyd_node *nb_op_update(struct lyd_node *tree, const char *path, const cha
        struct lyd_node *dnode;
        const char *abs_path = NULL;
 
-
        __dbg("updating path: %s with value: %s", path, value);
 
        dnode = yang_state_new(tree, path, value);
 
        if (path[0] == '/')
                abs_path = path;
-       else {
+       else
                abs_path = lyd_path(dnode, LYD_PATH_STD, NULL, 0);
-       }
 
        nb_notif_add(abs_path);
 
@@ -60,6 +357,7 @@ void nb_op_update_delete(struct lyd_node *tree, const char *path)
                assert(abs_path);
                if (path) {
                        char *tmp = darr_strdup(abs_path);
+
                        free(abs_path);
                        abs_path = tmp;
                        if (*darr_last(abs_path) != '/')
@@ -154,3 +452,211 @@ struct lyd_node *nb_op_updatef(struct lyd_node *tree, const char *path, const ch
 
        return dnode;
 }
+
+static struct op_changes_group *op_changes_group_next(void)
+{
+       struct op_changes_group *group;
+
+       group = op_changes_queue_pop(&op_changes_queue);
+       if (!group) {
+               op_changes_group_push();
+               group = op_changes_queue_pop(&op_changes_queue);
+       }
+       if (!group)
+               return NULL;
+       group->cur_changes = &group->dels;
+       group->cur_change = RB_MIN(op_changes, group->cur_changes);
+       if (!group->cur_change) {
+               group->cur_changes = &group->adds;
+               group->cur_change = RB_MIN(op_changes, group->cur_changes);
+               assert(group->cur_change);
+       }
+       return group;
+}
+
+/* ---------------------------- */
+/* Query for changes and notify */
+/* ---------------------------- */
+
+static void timer_walk_continue(struct event *event);
+
+static enum nb_error oper_walk_done(const struct lyd_node *tree, void *arg, enum nb_error ret)
+{
+       struct nb_notif_walk_args *args = arg;
+       struct op_changes_group *group = args->group;
+       const char *path = group->cur_change->path;
+       const char *op = group->cur_changes == &group->adds ? "add" : "delete";
+
+       /* we don't send batches when yielding as we need completed edit in any patch */
+       assert(ret != NB_YIELD);
+
+       nb_notif_walk = NULL;
+
+       if (ret == NB_ERR_NOT_FOUND) {
+               __dbg("Path not found while walking oper tree: %s", path);
+               XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
+               return ret;
+       }
+       /* Something else went wrong with the walk */
+       if (ret != NB_OK) {
+error:
+               __log_err("Error notifying for datastore change on path: %s: %s", path,
+                         nb_err_name(ret));
+               XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
+               /* XXX Need to inform mgmtd/front-ends things are out-of-sync */
+               return ret;
+       }
+
+       __dbg("done with oper-path collection for %s path: %s", op, path);
+
+       /* Do we need this? */
+       while (tree->parent)
+               tree = lyd_parent(tree);
+
+       /* Send the add (replace) notification */
+       if (mgmt_be_send_ds_replace_notification(path, tree)) {
+               ret = NB_ERR;
+               goto error;
+       }
+
+       /*
+        * Advance to next change (either dels or adds or both).
+        */
+
+       group->cur_change = RB_NEXT(op_changes, group->cur_change);
+       if (!group->cur_change) {
+               __dbg("done with oper-path collection for group");
+               op_changes_group_free(group);
+
+               group = op_changes_group_next();
+               args->group = group;
+               if (!group) {
+                       __dbg("done with ALL oper-path collection for notification");
+                       XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
+                       goto done;
+               }
+       }
+
+       event_add_timer_msec(nb_notif_master, timer_walk_continue, args, 0, &nb_notif_timer);
+done:
+       /* Done with current walk and scheduled next one if there is more */
+       nb_notif_walk = NULL;
+
+       return NB_OK;
+}
+
+static LY_ERR nb_notify_delete_changes(struct nb_notif_walk_args *args)
+{
+       struct op_changes_group *group = args->group;
+       LY_ERR err;
+
+       group->cur_change = RB_MIN(op_changes, group->cur_changes);
+       while (group->cur_change) {
+               err = mgmt_be_send_ds_delete_notification(group->cur_change->path);
+               assert(err == LY_SUCCESS); /* XXX */
+
+               group->cur_change = RB_NEXT(op_changes, group->cur_change);
+       }
+
+       return LY_SUCCESS;
+}
+
+static void timer_walk_continue(struct event *event)
+{
+       struct nb_notif_walk_args *args = EVENT_ARG(event);
+       struct op_changes_group *group = args->group;
+       const char *path;
+       LY_ERR err;
+
+       /*
+        * Notify about deletes until we have add changes to collect.
+        */
+       while (group->cur_changes == &group->dels) {
+               err = nb_notify_delete_changes(args);
+               assert(err == LY_SUCCESS);  /* XXX */
+               assert(!group->cur_change); /* we send all the deletes in one message */
+
+               /* after deletes advance to adds */
+               group->cur_changes = &group->adds;
+               group->cur_change = RB_MIN(op_changes, group->cur_changes);
+               if (group->cur_change)
+                       break;
+
+               __dbg("done with oper-path change group");
+               op_changes_group_free(group);
+
+               group = op_changes_group_next();
+               args->group = group;
+               if (!group) {
+                       __dbg("done with ALL oper-path changes");
+                       XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
+                       return;
+               }
+       }
+
+       path = group->cur_change->path;
+       __dbg("starting next oper-path replace walk for path: %s", path);
+       nb_notif_walk = nb_oper_walk(path, NULL, 0, false, NULL, NULL, oper_walk_done, args);
+}
+
+static void timer_walk_start(struct event *event)
+{
+       struct op_changes_group *group;
+       struct nb_notif_walk_args *args;
+
+       __dbg("oper-state change notification timer fires");
+
+       group = op_changes_group_next();
+       if (!group) {
+               __dbg("no oper changes to notify");
+               return;
+       }
+
+       args = XCALLOC(MTYPE_NB_NOTIF_WALK_ARGS, sizeof(*args));
+       args->group = group;
+
+       EVENT_ARG(event) = args;
+       timer_walk_continue(event);
+}
+
+static void nb_notif_set_walk_timer(void)
+{
+       if (nb_notif_walk) {
+               __dbg("oper-state walk already in progress.");
+               return;
+       }
+       if (event_is_scheduled(nb_notif_timer)) {
+               __dbg("oper-state notification timer already set.");
+               return;
+       }
+
+       __dbg("oper-state notification setting timer to fire in: %d msec ", NB_NOTIF_TIMER_MSEC);
+       event_add_timer_msec(nb_notif_master, timer_walk_start, NULL, NB_NOTIF_TIMER_MSEC,
+                            &nb_notif_timer);
+}
+
+void nb_notif_init(struct event_loop *tm)
+{
+       nb_notif_master = tm;
+       op_changes_queue_init(&op_changes_queue);
+}
+
+void nb_notif_terminate(void)
+{
+       struct nb_notif_walk_args *args;
+       struct op_changes_group *group;
+
+       EVENT_OFF(nb_notif_timer);
+
+       if (nb_notif_walk) {
+               nb_oper_cancel_walk(nb_notif_walk);
+               /* need to free the group that's in the walk */
+               args = nb_oper_walk_finish_arg(nb_notif_walk);
+               if (args)
+                       op_changes_group_free(args->group);
+               nb_notif_walk = NULL;
+       }
+
+       while ((group = op_changes_group_next()))
+               op_changes_group_free(group);
+}
index dc4be21611f6f4336fd8613c95571d472b5fcafc..a296b147acc3d6c6c86c10b30276b7fd3f7eaa7d 100644 (file)
@@ -1834,6 +1834,20 @@ bool nb_oper_is_yang_lib_query(const char *xpath)
        return strlen(xpath) > liblen;
 }
 
+void *nb_oper_walk_finish_arg(void *walk)
+{
+       struct nb_op_yield_state *ys = walk;
+
+       return ys->finish_arg;
+}
+
+void *nb_oper_walk_cb_arg(void *walk)
+{
+       struct nb_op_yield_state *ys = walk;
+
+       return ys->cb_arg;
+}
+
 void *nb_oper_walk(const char *xpath, struct yang_translator *translator,
                   uint32_t flags, bool should_batch, nb_oper_data_cb cb,
                   void *cb_arg, nb_oper_data_finish_cb finish, void *finish_arg)