]> git.puffer.fish Git - mirror/frr.git/commitdiff
zebra: dataplane provider enhancements
authorMark Stapp <mjs@voltanet.io>
Fri, 31 Aug 2018 17:46:50 +0000 (13:46 -0400)
committerMark Stapp <mjs@voltanet.io>
Wed, 21 Nov 2018 15:37:54 +0000 (10:37 -0500)
Limit the number of updates processed from the incoming queue;
add more stats. Fill out apis for dataplane providers; convert
route update processing to provider model; move dataplane
status enum

Signed-off-by: Mark Stapp <mjs@voltanet.io>
zebra/zebra_dplane.c
zebra/zebra_dplane.h
zebra/zebra_rib.c

index 6cd52f7bb81ec28d0dbdf0b27ea863bd8ade8332..c859bdaf6e229b3630b66ce6bfe66c4591ae9da6 100644 (file)
@@ -42,6 +42,9 @@ DEFINE_MTYPE(ZEBRA, DP_PROV, "Zebra DPlane Provider")
 const uint32_t DPLANE_DEFAULT_MAX_QUEUED = 200;
 
 
+/* Default value for new work per cycle */
+const uint32_t DPLANE_DEFAULT_NEW_WORK = 100;
+
 /* Validation check macro for context blocks */
 /* #define DPLANE_DEBUG 1 */
 
@@ -69,6 +72,12 @@ struct zebra_dplane_ctx {
        /* Status on return */
        enum zebra_dplane_result zd_status;
 
+       /* Dplane provider id */
+       uint32_t zd_provider;
+
+       /* Flags - used by providers, e.g. */
+       int zd_flags;
+
        /* TODO -- internal/sub-operation status? */
        enum zebra_dplane_result zd_remote_status;
        enum zebra_dplane_result zd_kernel_status;
@@ -118,6 +127,12 @@ struct zebra_dplane_ctx {
        TAILQ_ENTRY(zebra_dplane_ctx) zd_q_entries;
 };
 
+/* Flag that can be set by a pre-kernel provider as a signal that an update
+ * should bypass the kernel.
+ */
+#define DPLANE_CTX_FLAG_NO_KERNEL 0x01
+
+
 /*
  * Registration block for one dataplane provider.
  */
@@ -131,16 +146,35 @@ struct zebra_dplane_provider {
        /* Id value */
        uint32_t dp_id;
 
+       /* Mutex */
+       pthread_mutex_t dp_mutex;
+
+       /* Plugin-provided extra data */
+       void *dp_data;
+
+       /* Flags */
+       int dp_flags;
+
        dplane_provider_process_fp dp_fp;
 
        dplane_provider_fini_fp dp_fini;
 
        _Atomic uint32_t dp_in_counter;
+       _Atomic uint32_t dp_in_max;
+       _Atomic uint32_t dp_out_counter;
+       _Atomic uint32_t dp_out_max;
        _Atomic uint32_t dp_error_counter;
 
-       /* Embedded list linkage */
-       TAILQ_ENTRY(zebra_dplane_provider) dp_q_providers;
+       /* Queue of contexts inbound to the provider */
+       struct dplane_ctx_q dp_ctx_in_q;
+
+       /* Queue of completed contexts outbound from the provider back
+        * towards the dataplane module.
+        */
+       struct dplane_ctx_q dp_ctx_out_q;
 
+       /* Embedded list linkage for provider objects */
+       TAILQ_ENTRY(zebra_dplane_provider) dp_prov_link;
 };
 
 /*
@@ -171,10 +205,16 @@ static struct zebra_dplane_globals {
        /* Limit number of pending, unprocessed updates */
        _Atomic uint32_t dg_max_queued_updates;
 
+       /* Limit number of new updates dequeued at once, to pace an
+        * incoming burst.
+        */
+       uint32_t dg_updates_per_cycle;
+
        _Atomic uint32_t dg_routes_in;
        _Atomic uint32_t dg_routes_queued;
        _Atomic uint32_t dg_routes_queued_max;
        _Atomic uint32_t dg_route_errors;
+       _Atomic uint32_t dg_update_yields;
 
        /* Dataplane pthread */
        struct frr_pthread *dg_pthread;
@@ -191,14 +231,20 @@ static struct zebra_dplane_globals {
 } zdplane_info;
 
 /*
- * Lock and unlock for interactions with the zebra 'core'
+ * Lock and unlock for interactions with the zebra 'core' pthread
  */
 #define DPLANE_LOCK() pthread_mutex_lock(&zdplane_info.dg_mutex)
-
 #define DPLANE_UNLOCK() pthread_mutex_unlock(&zdplane_info.dg_mutex)
 
+
+/*
+ * Lock and unlock for individual providers
+ */
+#define DPLANE_PROV_LOCK(p)   pthread_mutex_lock(&((p)->dp_mutex))
+#define DPLANE_PROV_UNLOCK(p) pthread_mutex_unlock(&((p)->dp_mutex))
+
 /* Prototypes */
-static int dplane_route_process(struct thread *event);
+static int dplane_thread_loop(struct thread *event);
 
 /*
  * Public APIs
@@ -285,6 +331,38 @@ enum zebra_dplane_result dplane_ctx_get_status(
        return ctx->zd_status;
 }
 
+void dplane_ctx_set_status(struct zebra_dplane_ctx *ctx,
+                          enum zebra_dplane_result status)
+{
+       DPLANE_CTX_VALID(ctx);
+
+       ctx->zd_status = status;
+}
+
+/* Retrieve last/current provider id */
+uint32_t dplane_ctx_get_provider(const struct zebra_dplane_ctx *ctx)
+{
+       DPLANE_CTX_VALID(ctx);
+       return ctx->zd_provider;
+}
+
+/* Providers run before the kernel can control whether a kernel
+ * update should be done.
+ */
+void dplane_ctx_set_skip_kernel(struct zebra_dplane_ctx *ctx)
+{
+       DPLANE_CTX_VALID(ctx);
+
+       SET_FLAG(ctx->zd_flags, DPLANE_CTX_FLAG_NO_KERNEL);
+}
+
+bool dplane_ctx_is_skip_kernel(const struct zebra_dplane_ctx *ctx)
+{
+       DPLANE_CTX_VALID(ctx);
+
+       return CHECK_FLAG(ctx->zd_flags, DPLANE_CTX_FLAG_NO_KERNEL);
+}
+
 enum dplane_op_e dplane_ctx_get_op(const struct zebra_dplane_ctx *ctx)
 {
        DPLANE_CTX_VALID(ctx);
@@ -517,6 +595,7 @@ const struct zebra_dplane_info *dplane_ctx_get_ns(
  * End of dplane context accessors
  */
 
+
 /*
  * Retrieve the limit on the number of pending, unprocessed updates.
  */
@@ -678,34 +757,11 @@ static int dplane_route_enqueue(struct zebra_dplane_ctx *ctx)
        }
 
        /* Ensure that an event for the dataplane thread is active */
-       thread_add_event(zdplane_info.dg_master, dplane_route_process, NULL, 0,
-                        &zdplane_info.dg_t_update);
-
-       ret = AOK;
+       ret = dplane_provider_work_ready();
 
        return ret;
 }
 
-/*
- * Attempt to dequeue a route-update block
- */
-static struct zebra_dplane_ctx *dplane_route_dequeue(void)
-{
-       struct zebra_dplane_ctx *ctx = NULL;
-
-       DPLANE_LOCK();
-       {
-               ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q);
-               if (ctx) {
-                       TAILQ_REMOVE(&zdplane_info.dg_route_ctx_q,
-                                    ctx, zd_q_entries);
-               }
-       }
-       DPLANE_UNLOCK();
-
-       return ctx;
-}
-
 /*
  * Utility that prepares a route update and enqueues it for processing
  */
@@ -828,68 +884,15 @@ done:
        return ret;
 }
 
-/*
- * Event handler function for routing updates
- */
-static int dplane_route_process(struct thread *event)
-{
-       enum zebra_dplane_result res;
-       struct zebra_dplane_ctx *ctx;
-
-       while (1) {
-               /* Check for shutdown */
-               if (!zdplane_info.dg_run)
-                       break;
-
-               /* TODO -- limit number of updates per cycle? */
-               ctx = dplane_route_dequeue();
-               if (ctx == NULL)
-                       break;
-
-               /* Update counter */
-               atomic_fetch_sub_explicit(&zdplane_info.dg_routes_queued, 1,
-                                         memory_order_relaxed);
-
-               if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) {
-                       char dest_str[PREFIX_STRLEN];
-
-                       prefix2str(dplane_ctx_get_dest(ctx),
-                                  dest_str, sizeof(dest_str));
-
-                       zlog_debug("%u:%s Dplane route update ctx %p op %s",
-                                  dplane_ctx_get_vrf(ctx), dest_str,
-                                  ctx, dplane_op2str(dplane_ctx_get_op(ctx)));
-               }
-
-               /* TODO -- support series of providers */
-
-               /* Initially, just doing kernel-facing update here */
-               res = kernel_route_update(ctx);
-
-               if (res != ZEBRA_DPLANE_REQUEST_SUCCESS)
-                       atomic_fetch_add_explicit(&zdplane_info.dg_route_errors,
-                                                 1, memory_order_relaxed);
-
-               ctx->zd_status = res;
-
-               /* Enqueue result to zebra main context */
-               zdplane_info.dg_results_cb(ctx);
-
-               ctx = NULL;
-       }
-
-       return 0;
-}
-
 /*
  * Handler for 'show dplane'
  */
 int dplane_show_helper(struct vty *vty, bool detailed)
 {
-       uint64_t queued, limit, queue_max, errs, incoming;
+       uint64_t queued, queue_max, limit, errs, incoming, yields;
 
        /* Using atomics because counters are being changed in different
-        * contexts.
+        * pthread contexts.
         */
        incoming = atomic_load_explicit(&zdplane_info.dg_routes_in,
                                        memory_order_relaxed);
@@ -901,12 +904,16 @@ int dplane_show_helper(struct vty *vty, bool detailed)
                                         memory_order_relaxed);
        errs = atomic_load_explicit(&zdplane_info.dg_route_errors,
                                    memory_order_relaxed);
+       yields = atomic_load_explicit(&zdplane_info.dg_update_yields,
+                                     memory_order_relaxed);
 
-       vty_out(vty, "Route updates:            %"PRIu64"\n", incoming);
+       vty_out(vty, "Zebra dataplane:\nRoute updates:            %"PRIu64"\n",
+               incoming);
        vty_out(vty, "Route update errors:      %"PRIu64"\n", errs);
        vty_out(vty, "Route update queue limit: %"PRIu64"\n", limit);
        vty_out(vty, "Route update queue depth: %"PRIu64"\n", queued);
        vty_out(vty, "Route update queue max:   %"PRIu64"\n", queue_max);
+       vty_out(vty, "Route update yields:      %"PRIu64"\n", yields);
 
        return CMD_SUCCESS;
 }
@@ -916,8 +923,35 @@ int dplane_show_helper(struct vty *vty, bool detailed)
  */
 int dplane_show_provs_helper(struct vty *vty, bool detailed)
 {
-       vty_out(vty, "Zebra dataplane providers:%s\n",
-               (detailed ? " (detailed)" : ""));
+       struct zebra_dplane_provider *prov;
+       uint64_t in, in_max, out, out_max;
+
+       vty_out(vty, "Zebra dataplane providers:\n");
+
+       DPLANE_LOCK();
+       prov = TAILQ_FIRST(&zdplane_info.dg_providers_q);
+       DPLANE_UNLOCK();
+
+       /* Show counters, useful info from each registered provider */
+       while (prov) {
+
+               in = atomic_load_explicit(&prov->dp_in_counter,
+                                         memory_order_relaxed);
+               in_max = atomic_load_explicit(&prov->dp_in_max,
+                                             memory_order_relaxed);
+               out = atomic_load_explicit(&prov->dp_out_counter,
+                                          memory_order_relaxed);
+               out_max = atomic_load_explicit(&prov->dp_out_max,
+                                              memory_order_relaxed);
+
+               vty_out(vty, "%s (%u): in: %"PRIu64", max: %"PRIu64", "
+                       "out: %"PRIu64", max: %"PRIu64"\n",
+                       prov->dp_name, prov->dp_id, in, in_max, out, out_max);
+
+               DPLANE_LOCK();
+               prov = TAILQ_NEXT(prov, dp_prov_link);
+               DPLANE_UNLOCK();
+       }
 
        return CMD_SUCCESS;
 }
@@ -926,9 +960,11 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed)
  * Provider registration
  */
 int dplane_provider_register(const char *name,
-                            enum dplane_provider_prio_e prio,
+                            enum dplane_provider_prio prio,
+                            int flags,
                             dplane_provider_process_fp fp,
-                            dplane_provider_fini_fp fini_fp)
+                            dplane_provider_fini_fp fini_fp,
+                            void *data)
 {
        int ret = 0;
        struct zebra_dplane_provider *p, *last;
@@ -952,37 +988,160 @@ int dplane_provider_register(const char *name,
                goto done;
        }
 
-       strncpy(p->dp_name, name, DPLANE_PROVIDER_NAMELEN);
-       p->dp_name[DPLANE_PROVIDER_NAMELEN] = '\0'; /* Belt-and-suspenders */
+       pthread_mutex_init(&(p->dp_mutex), NULL);
+       TAILQ_INIT(&(p->dp_ctx_in_q));
+       TAILQ_INIT(&(p->dp_ctx_out_q));
 
        p->dp_priority = prio;
        p->dp_fp = fp;
        p->dp_fini = fini_fp;
+       p->dp_data = data;
 
-       /* Lock the lock - the dplane pthread may be running */
+       /* Lock - the dplane pthread may be running */
        DPLANE_LOCK();
 
        p->dp_id = ++zdplane_info.dg_provider_id;
 
+       if (name)
+               strlcpy(p->dp_name, name, DPLANE_PROVIDER_NAMELEN);
+       else
+               snprintf(p->dp_name, DPLANE_PROVIDER_NAMELEN,
+                        "provider-%u", p->dp_id);
+
        /* Insert into list ordered by priority */
-       TAILQ_FOREACH(last, &zdplane_info.dg_providers_q, dp_q_providers) {
+       TAILQ_FOREACH(last, &zdplane_info.dg_providers_q, dp_prov_link) {
                if (last->dp_priority > p->dp_priority)
                        break;
        }
 
        if (last)
-               TAILQ_INSERT_BEFORE(last, p, dp_q_providers);
+               TAILQ_INSERT_BEFORE(last, p, dp_prov_link);
        else
                TAILQ_INSERT_TAIL(&zdplane_info.dg_providers_q, p,
-                                 dp_q_providers);
+                                 dp_prov_link);
 
        /* And unlock */
        DPLANE_UNLOCK();
 
+       if (IS_ZEBRA_DEBUG_DPLANE)
+               zlog_debug("dplane: registered new provider '%s' (%u), prio %d",
+                          p->dp_name, p->dp_id, p->dp_priority);
+
 done:
        return ret;
 }
 
+/* Accessors for provider attributes */
+const char *dplane_provider_get_name(const struct zebra_dplane_provider *prov)
+{
+       return prov->dp_name;
+}
+
+uint32_t dplane_provider_get_id(const struct zebra_dplane_provider *prov)
+{
+       return prov->dp_id;
+}
+
+void *dplane_provider_get_data(const struct zebra_dplane_provider *prov)
+{
+       return prov->dp_data;
+}
+
+int dplane_provider_get_work_limit(const struct zebra_dplane_provider *prov)
+{
+       return zdplane_info.dg_updates_per_cycle;
+}
+
+/*
+ * Dequeue and maintain associated counter
+ */
+struct zebra_dplane_ctx *dplane_provider_dequeue_in_ctx(
+       struct zebra_dplane_provider *prov)
+{
+       struct zebra_dplane_ctx *ctx = NULL;
+
+       if (dplane_provider_is_threaded(prov))
+               DPLANE_PROV_LOCK(prov);
+
+       ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q));
+       if (ctx) {
+               TAILQ_REMOVE(&(prov->dp_ctx_in_q), ctx, zd_q_entries);
+       }
+
+       if (dplane_provider_is_threaded(prov))
+               DPLANE_PROV_UNLOCK(prov);
+
+       return ctx;
+}
+
+/*
+ * Dequeue work to a list, return count
+ */
+int dplane_provider_dequeue_in_list(struct zebra_dplane_provider *prov,
+                                   struct dplane_ctx_q *listp)
+{
+       int limit, ret;
+       struct zebra_dplane_ctx *ctx;
+
+       limit = zdplane_info.dg_updates_per_cycle;
+
+       if (dplane_provider_is_threaded(prov))
+               DPLANE_PROV_LOCK(prov);
+
+       for (ret = 0; ret < limit; ret++) {
+               ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q));
+               if (ctx) {
+                       TAILQ_REMOVE(&(prov->dp_ctx_in_q), ctx, zd_q_entries);
+
+                       TAILQ_INSERT_TAIL(listp, ctx, zd_q_entries);
+               } else {
+                       break;
+               }
+       }
+
+       if (dplane_provider_is_threaded(prov))
+               DPLANE_PROV_UNLOCK(prov);
+
+       return ret;
+}
+
+/*
+ * Enqueue and maintain associated counter
+ */
+void dplane_provider_enqueue_out_ctx(struct zebra_dplane_provider *prov,
+                                    struct zebra_dplane_ctx *ctx)
+{
+       if (dplane_provider_is_threaded(prov))
+               DPLANE_PROV_LOCK(prov);
+
+       TAILQ_INSERT_TAIL(&(prov->dp_ctx_out_q), ctx,
+                         zd_q_entries);
+
+       if (dplane_provider_is_threaded(prov))
+               DPLANE_PROV_UNLOCK(prov);
+
+       atomic_fetch_add_explicit(&(prov->dp_out_counter), 1,
+                                 memory_order_relaxed);
+}
+
+bool dplane_provider_is_threaded(const struct zebra_dplane_provider *prov)
+{
+       return (prov->dp_flags & DPLANE_PROV_FLAG_THREADED);
+}
+
+/*
+ * Provider api to signal that work/events are available
+ * for the dataplane pthread.
+ */
+int dplane_provider_work_ready(void)
+{
+       thread_add_event(zdplane_info.dg_master,
+                        dplane_thread_loop, NULL, 0,
+                        &zdplane_info.dg_t_update);
+
+       return AOK;
+}
+
 /*
  * Zebra registers a results callback with the dataplane system
  */
@@ -993,37 +1152,154 @@ int dplane_results_register(dplane_results_fp fp)
 }
 
 /*
- * Initialize the dataplane module during startup, internal/private version
+ * Kernel dataplane provider
  */
-static void zebra_dplane_init_internal(struct zebra_t *zebra)
+
+/*
+ * Kernel provider callback
+ */
+static int kernel_dplane_process_func(struct zebra_dplane_provider *prov)
 {
-       memset(&zdplane_info, 0, sizeof(zdplane_info));
+       enum zebra_dplane_result res;
+       struct zebra_dplane_ctx *ctx;
+       int counter, limit;
 
-       pthread_mutex_init(&zdplane_info.dg_mutex, NULL);
+       limit = dplane_provider_get_work_limit(prov);
 
-       TAILQ_INIT(&zdplane_info.dg_route_ctx_q);
-       TAILQ_INIT(&zdplane_info.dg_providers_q);
+       if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+               zlog_debug("dplane provider '%s': processing",
+                          dplane_provider_get_name(prov));
 
-       zdplane_info.dg_max_queued_updates = DPLANE_DEFAULT_MAX_QUEUED;
+       for (counter = 0; counter < limit; counter++) {
 
-       /* TODO -- register default kernel 'provider' during init */
-       zdplane_info.dg_run = true;
+               ctx = dplane_provider_dequeue_in_ctx(prov);
+               if (ctx == NULL)
+                       break;
 
-       /* Start dataplane pthread */
+               if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) {
+                       char dest_str[PREFIX_STRLEN];
 
-       zdplane_info.dg_run = true;
+                       prefix2str(dplane_ctx_get_dest(ctx),
+                                  dest_str, sizeof(dest_str));
 
-       struct frr_pthread_attr pattr = {
-               .start = frr_pthread_attr_default.start,
-               .stop = frr_pthread_attr_default.stop
-       };
+                       zlog_debug("%u:%s Dplane route update ctx %p op %s",
+                                  dplane_ctx_get_vrf(ctx), dest_str,
+                                  ctx, dplane_op2str(dplane_ctx_get_op(ctx)));
+               }
 
-       zdplane_info.dg_pthread = frr_pthread_new(&pattr, "Zebra dplane thread",
-                                                 "Zebra dplane");
+               /* Call into the synchronous kernel-facing code here */
+               res = kernel_route_update(ctx);
 
-       zdplane_info.dg_master = zdplane_info.dg_pthread->master;
+               if (res != ZEBRA_DPLANE_REQUEST_SUCCESS)
+                       atomic_fetch_add_explicit(
+                               &zdplane_info.dg_route_errors, 1,
+                               memory_order_relaxed);
 
-       frr_pthread_run(zdplane_info.dg_pthread, NULL);
+               dplane_ctx_set_status(ctx, res);
+
+               dplane_provider_enqueue_out_ctx(prov, ctx);
+       }
+
+       /* Ensure that we'll run the work loop again if there's still
+        * more work to do.
+        */
+       if (counter >= limit) {
+               if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+                       zlog_debug("dplane provider '%s' reached max updates %d",
+                                  dplane_provider_get_name(prov), counter);
+
+               atomic_fetch_add_explicit(&zdplane_info.dg_update_yields,
+                                         1, memory_order_relaxed);
+
+               dplane_provider_work_ready();
+       }
+
+       return 0;
+}
+
+/*
+ * Test dataplane provider plugin
+ */
+
+/*
+ * Test provider process callback
+ */
+static int test_dplane_process_func(struct zebra_dplane_provider *prov)
+{
+       struct zebra_dplane_ctx *ctx;
+       int counter, limit;
+
+       /* Just moving from 'in' queue to 'out' queue */
+
+       if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+               zlog_debug("dplane provider '%s': processing",
+                          dplane_provider_get_name(prov));
+
+       limit = dplane_provider_get_work_limit(prov);
+
+       for (counter = 0; counter < limit; counter++) {
+
+               ctx = dplane_provider_dequeue_in_ctx(prov);
+               if (ctx == NULL)
+                       break;
+
+               dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
+
+               dplane_provider_enqueue_out_ctx(prov, ctx);
+       }
+
+       /* Ensure that we'll run the work loop again if there's still
+        * more work to do.
+        */
+       if (counter >= limit)
+               dplane_provider_work_ready();
+
+       return 0;
+}
+
+/*
+ * Test provider shutdown/fini callback
+ */
+static int test_dplane_shutdown_func(struct zebra_dplane_provider *prov,
+                                    bool early)
+{
+       if (IS_ZEBRA_DEBUG_DPLANE)
+               zlog_debug("dplane provider '%s': %sshutdown",
+                          dplane_provider_get_name(prov),
+                          early ? "early " : "");
+
+       return 0;
+}
+
+/*
+ * Register default kernel provider
+ */
+static void dplane_provider_init(void)
+{
+       int ret;
+
+       ret = dplane_provider_register("Kernel",
+                                      DPLANE_PRIO_KERNEL,
+                                      DPLANE_PROV_FLAGS_DEFAULT,
+                                      kernel_dplane_process_func,
+                                      NULL,
+                                      NULL);
+
+       if (ret != AOK)
+               zlog_err("Unable to register kernel dplane provider: %d",
+                        ret);
+
+       /* TODO -- make the test provider optional... */
+       ret = dplane_provider_register("Test",
+                                      DPLANE_PRIO_PRE_KERNEL,
+                                      DPLANE_PROV_FLAGS_DEFAULT,
+                                      test_dplane_process_func,
+                                      test_dplane_shutdown_func,
+                                      NULL /* data */);
+
+       if (ret != AOK)
+               zlog_err("Unable to register test dplane provider: %d",
+                        ret);
 }
 
 /* Indicates zebra shutdown/exit is in progress. Some operations may be
@@ -1059,7 +1335,9 @@ static bool dplane_work_pending(void)
 {
        struct zebra_dplane_ctx *ctx;
 
-       /* TODO -- just checking incoming/pending work for now */
+       /* TODO -- just checking incoming/pending work for now, must check
+        * providers
+        */
        DPLANE_LOCK();
        {
                ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q);
@@ -1120,6 +1398,184 @@ void zebra_dplane_finish(void)
                         &zdplane_info.dg_t_shutdown_check);
 }
 
+/*
+ * Main dataplane pthread event loop. The thread takes new incoming work
+ * and offers it to the first provider. It then iterates through the
+ * providers, taking complete work from each one and offering it
+ * to the next in order. At each step, a limited number of updates are
+ * processed during a cycle in order to provide some fairness.
+ */
+static int dplane_thread_loop(struct thread *event)
+{
+       struct dplane_ctx_q work_list;
+       struct dplane_ctx_q error_list;
+       struct zebra_dplane_provider *prov;
+       struct zebra_dplane_ctx *ctx, *tctx;
+       int limit, counter, error_counter;
+       uint64_t lval;
+
+       /* Capture work limit per cycle */
+       limit = zdplane_info.dg_updates_per_cycle;
+
+       TAILQ_INIT(&work_list);
+
+       /* Check for zebra shutdown */
+       if (!zdplane_info.dg_run)
+               goto done;
+
+       /* Dequeue some incoming work from zebra (if any) onto the temporary
+        * working list.
+        */
+       DPLANE_LOCK();
+
+       /* Locate initial registered provider */
+       prov = TAILQ_FIRST(&zdplane_info.dg_providers_q);
+
+       for (counter = 0; counter < limit; counter++) {
+               ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q);
+               if (ctx) {
+                       TAILQ_REMOVE(&zdplane_info.dg_route_ctx_q, ctx,
+                                    zd_q_entries);
+
+                       atomic_fetch_sub_explicit(
+                               &zdplane_info.dg_routes_queued, 1,
+                               memory_order_relaxed);
+
+                       ctx->zd_provider = prov->dp_id;
+
+                       TAILQ_INSERT_TAIL(&work_list, ctx, zd_q_entries);
+               } else {
+                       break;
+               }
+       }
+
+       DPLANE_UNLOCK();
+
+       if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+               zlog_debug("dplane: incoming new work counter: %d", counter);
+
+       /* Iterate through the registered providers, offering new incoming
+        * work. If the provider has outgoing work in its queue, take that
+        * work for the next provider
+        */
+       while (prov) {
+
+               if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+                       zlog_debug("dplane enqueues %d new work to provider '%s'",
+                                  counter, dplane_provider_get_name(prov));
+
+               /* Capture current provider id in each context; check for
+                * error status.
+                */
+               TAILQ_FOREACH_SAFE(ctx, &work_list, zd_q_entries, tctx) {
+                       if (dplane_ctx_get_status(ctx) ==
+                           ZEBRA_DPLANE_REQUEST_SUCCESS) {
+                               ctx->zd_provider = prov->dp_id;
+                       } else {
+                               /*
+                                * TODO -- improve error-handling: recirc
+                                * errors backwards so that providers can
+                                * 'undo' their work (if they want to)
+                                */
+
+                               /* Move to error list; will be returned
+                                * zebra main.
+                                */
+                               TAILQ_REMOVE(&work_list, ctx, zd_q_entries);
+                               TAILQ_INSERT_TAIL(&error_list,
+                                                 ctx, zd_q_entries);
+                               error_counter++;
+                       }
+               }
+
+               /* Enqueue new work to the provider */
+               if (dplane_provider_is_threaded(prov))
+                       DPLANE_PROV_LOCK(prov);
+
+               if (TAILQ_FIRST(&work_list))
+                       TAILQ_CONCAT(&(prov->dp_ctx_in_q), &work_list,
+                                    zd_q_entries);
+
+               lval = atomic_add_fetch_explicit(&prov->dp_in_counter, counter,
+                                                memory_order_relaxed);
+               if (lval > prov->dp_in_max)
+                       atomic_store_explicit(&prov->dp_in_max, lval,
+                                             memory_order_relaxed);
+
+               if (dplane_provider_is_threaded(prov))
+                       DPLANE_PROV_UNLOCK(prov);
+
+               /* Reset the temp list */
+               TAILQ_INIT(&work_list);
+               counter = 0;
+
+               /* Call into the provider code */
+               (*prov->dp_fp)(prov);
+
+               /* Check for zebra shutdown */
+               if (!zdplane_info.dg_run)
+                       break;
+
+               /* Dequeue completed work from the provider */
+               if (dplane_provider_is_threaded(prov))
+                       DPLANE_PROV_LOCK(prov);
+
+               while (counter < limit) {
+                       ctx = TAILQ_FIRST(&(prov->dp_ctx_out_q));
+                       if (ctx) {
+                               TAILQ_REMOVE(&(prov->dp_ctx_out_q), ctx,
+                                            zd_q_entries);
+
+                               TAILQ_INSERT_TAIL(&work_list,
+                                                 ctx, zd_q_entries);
+                               counter++;
+                       } else
+                               break;
+               }
+
+               if (dplane_provider_is_threaded(prov))
+                       DPLANE_PROV_UNLOCK(prov);
+
+               if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+                       zlog_debug("dplane dequeues %d completed work from provider %s",
+                                  counter, dplane_provider_get_name(prov));
+
+               /* Locate next provider */
+               DPLANE_LOCK();
+               prov = TAILQ_NEXT(prov, dp_prov_link);
+               DPLANE_UNLOCK();
+
+               if (prov) {
+                       TAILQ_FOREACH(ctx, &work_list, zd_q_entries)
+                               ctx->zd_provider = prov->dp_id;
+               }
+       }
+
+       /* After all providers have been serviced, enqueue any completed
+        * work back to zebra so it can process the results.
+        */
+
+       if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
+               zlog_debug("dplane has %d completed work for zebra main",
+                          counter);
+
+       /*
+        * TODO -- I'd rather hand lists through the api to zebra main,
+        * to reduce the number of lock/unlock cycles
+        */
+       for (ctx = TAILQ_FIRST(&work_list); ctx; ) {
+               TAILQ_REMOVE(&work_list, ctx, zd_q_entries);
+
+               /* Call through to zebra main */
+               (*zdplane_info.dg_results_cb)(ctx);
+
+               ctx = TAILQ_FIRST(&work_list);
+       }
+
+done:
+       return 0;
+}
+
 /*
  * Final phase of shutdown, after all work enqueued to dplane has been
  * processed. This is called from the zebra main pthread context.
@@ -1142,11 +1598,51 @@ void zebra_dplane_shutdown(void)
        zdplane_info.dg_pthread = NULL;
        zdplane_info.dg_master = NULL;
 
-       /* Notify provider(s) of final shutdown */
+       /* TODO -- Notify provider(s) of final shutdown */
+
+       /* TODO -- Clean-up provider objects */
+
+       /* TODO -- Clean queue(s), free memory */
+}
+
+/*
+ * Initialize the dataplane module during startup, internal/private version
+ */
+static void zebra_dplane_init_internal(struct zebra_t *zebra)
+{
+       memset(&zdplane_info, 0, sizeof(zdplane_info));
+
+       pthread_mutex_init(&zdplane_info.dg_mutex, NULL);
 
-       /* Clean-up provider objects */
+       TAILQ_INIT(&zdplane_info.dg_route_ctx_q);
+       TAILQ_INIT(&zdplane_info.dg_providers_q);
 
-       /* Clean queue(s) */
+       zdplane_info.dg_updates_per_cycle = DPLANE_DEFAULT_NEW_WORK;
+
+       zdplane_info.dg_max_queued_updates = DPLANE_DEFAULT_MAX_QUEUED;
+
+       /* Register default kernel 'provider' during init */
+       dplane_provider_init();
+
+       /* Start dataplane pthread */
+
+       zdplane_info.dg_run = true;
+
+       struct frr_pthread_attr pattr = {
+               .start = frr_pthread_attr_default.start,
+               .stop = frr_pthread_attr_default.stop
+       };
+
+       zdplane_info.dg_pthread = frr_pthread_new(&pattr, "Zebra dplane thread",
+                                                 "Zebra dplane");
+
+       zdplane_info.dg_master = zdplane_info.dg_pthread->master;
+
+       /* Enqueue an initial event for the dataplane pthread */
+       thread_add_event(zdplane_info.dg_master, dplane_thread_loop, NULL, 0,
+                        &zdplane_info.dg_t_update);
+
+       frr_pthread_run(zdplane_info.dg_pthread, NULL);
 }
 
 /*
index 999e0f39e4c3607f13272a2a3f8fcadad7a305e7..844fd59600920c5f3b23cac3e662c600d685dd1a 100644 (file)
@@ -29,7 +29,6 @@
 #include "zebra/rib.h"
 #include "zebra/zserv.h"
 
-
 /* Key netlink info from zebra ns */
 struct zebra_dplane_info {
        ns_id_t ns_id;
@@ -127,6 +126,12 @@ void dplane_ctx_fini(struct zebra_dplane_ctx **pctx);
 void dplane_ctx_enqueue_tail(struct dplane_ctx_q *q,
                             const struct zebra_dplane_ctx *ctx);
 
+/* Append a list of context blocks to another list - again, just keeping
+ * the context struct opaque.
+ */
+void dplane_ctx_list_append(struct dplane_ctx_q *to_list,
+                           struct dplane_ctx_q *from_list);
+
 /* Dequeue a context block from the head of caller's tailq */
 void dplane_ctx_dequeue(struct dplane_ctx_q *q, struct zebra_dplane_ctx **ctxp);
 
@@ -135,6 +140,8 @@ void dplane_ctx_dequeue(struct dplane_ctx_q *q, struct zebra_dplane_ctx **ctxp);
  */
 enum zebra_dplane_result dplane_ctx_get_status(
        const struct zebra_dplane_ctx *ctx);
+void dplane_ctx_set_status(struct zebra_dplane_ctx *ctx,
+                          enum zebra_dplane_result status);
 const char *dplane_res2str(enum zebra_dplane_result res);
 
 enum dplane_op_e dplane_ctx_get_op(const struct zebra_dplane_ctx *ctx);
@@ -142,6 +149,15 @@ const char *dplane_op2str(enum dplane_op_e op);
 
 const struct prefix *dplane_ctx_get_dest(const struct zebra_dplane_ctx *ctx);
 
+/* Retrieve last/current provider id */
+uint32_t dplane_ctx_get_provider(const struct zebra_dplane_ctx *ctx);
+
+/* Providers running before the kernel can control whether a kernel
+ * update should be done.
+ */
+void dplane_ctx_set_skip_kernel(struct zebra_dplane_ctx *ctx);
+bool dplane_ctx_is_skip_kernel(const struct zebra_dplane_ctx *ctx);
+
 /* Source prefix is a little special - use convention to return NULL
  * to mean "no src prefix"
  */
@@ -212,9 +228,11 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed);
 
 
 /*
- * Dataplane providers: modules that consume dataplane events.
+ * Dataplane providers: modules that process or consume dataplane events.
  */
 
+struct zebra_dplane_provider;
+
 /* Support string name for a dataplane provider */
 #define DPLANE_PROVIDER_NAMELEN 64
 
@@ -223,7 +241,7 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed);
  * followed by the kernel, followed by some post-processing step (such as
  * the fpm output stream.)
  */
-enum dplane_provider_prio_e {
+enum dplane_provider_prio {
        DPLANE_PRIO_NONE = 0,
        DPLANE_PRIO_PREPROCESS,
        DPLANE_PRIO_PRE_KERNEL,
@@ -232,28 +250,72 @@ enum dplane_provider_prio_e {
        DPLANE_PRIO_LAST
 };
 
-/* Provider's entry-point to process a context block */
-typedef int (*dplane_provider_process_fp)(struct zebra_dplane_ctx *ctx);
+/* Provider's entry-point for incoming work, called in the context of the
+ * dataplane pthread. The dataplane pthread enqueues any new work to the
+ * provider's 'inbound' queue, then calls the callback. The dataplane
+ * then checks the provider's outbound queue.
+ */
+typedef int (*dplane_provider_process_fp)(struct zebra_dplane_provider *prov);
+
+/* Provider's entry-point for shutdown and cleanup. Called with 'early'
+ * during shutdown, to indicate that the dataplane subsystem is allowing
+ * work to move through the providers and finish. When called without 'early',
+ * the provider should release all resources (if it has any allocated).
+ */
+typedef int (*dplane_provider_fini_fp)(struct zebra_dplane_provider *prov,
+                                      bool early);
+
+/* Flags values used during provider registration. */
+#define DPLANE_PROV_FLAGS_DEFAULT  0x0
+
+/* Provider will be spawning its own worker thread */
+#define DPLANE_PROV_FLAG_THREADED  0x1
 
-/* Provider's entry-point for shutdown and cleanup */
-typedef int (*dplane_provider_fini_fp)(void);
 
-/* Provider registration */
+/* Provider registration: ordering or priority value, callbacks, and optional
+ * opaque data value.
+ */
 int dplane_provider_register(const char *name,
-                            enum dplane_provider_prio_e prio,
+                            enum dplane_provider_prio prio,
+                            int flags,
                             dplane_provider_process_fp fp,
-                            dplane_provider_fini_fp fini_fp);
+                            dplane_provider_fini_fp fini_fp,
+                            void *data);
 
-/*
- * Results are returned to zebra core via a callback
+/* Accessors for provider attributes */
+const char *dplane_provider_get_name(const struct zebra_dplane_provider *prov);
+uint32_t dplane_provider_get_id(const struct zebra_dplane_provider *prov);
+void *dplane_provider_get_data(const struct zebra_dplane_provider *prov);
+bool dplane_provider_is_threaded(const struct zebra_dplane_provider *prov);
+
+/* Providers should limit number of updates per work cycle */
+int dplane_provider_get_work_limit(const struct zebra_dplane_provider *prov);
+
+/* Provider api to signal that work/events are available
+ * for the dataplane pthread.
  */
-typedef int (*dplane_results_fp)(const struct zebra_dplane_ctx *ctx);
+int dplane_provider_work_ready(void);
+
+/* Dequeue, maintain associated counter and locking */
+struct zebra_dplane_ctx *dplane_provider_dequeue_in_ctx(
+       struct zebra_dplane_provider *prov);
+
+/* Dequeue work to a list, maintain counter and locking, return count */
+int dplane_provider_dequeue_in_list(struct zebra_dplane_provider *prov,
+                                   struct dplane_ctx_q *listp);
+
+/* Enqueue, maintain associated counter and locking */
+void dplane_provider_enqueue_out_ctx(struct zebra_dplane_provider *prov,
+                                    struct zebra_dplane_ctx *ctx);
 
 /*
  * Zebra registers a results callback with the dataplane. The callback is
- * called in the dataplane thread context, so the expectation is that the
- * context is queued (or that processing is very limited).
+ * called in the dataplane pthread context, so the expectation is that the
+ * context is queued for the zebra main pthread or that processing
+ * is very limited.
  */
+typedef int (*dplane_results_fp)(struct zebra_dplane_ctx *ctx);
+
 int dplane_results_register(dplane_results_fp fp);
 
 /*
@@ -264,7 +326,8 @@ void zebra_dplane_init(void);
 
 /* Finalize/cleanup apis, one called early as shutdown is starting,
  * one called late at the end of zebra shutdown, and then one called
- * from the zebra main thread to stop the dplane thread free all resources.
+ * from the zebra main pthread to stop the dplane pthread and
+ * free all resources.
  *
  * Zebra expects to try to clean up all vrfs and all routes during
  * shutdown, so the dplane must be available until very late.
index 828539252732166570af5970fd31135c58a703db..f161d5875b1d8fda1cd7f7b73a395affac77d22b 100644 (file)
@@ -1932,11 +1932,10 @@ static void rib_process_after(struct zebra_dplane_ctx *ctx)
        op = dplane_ctx_get_op(ctx);
        status = dplane_ctx_get_status(ctx);
 
-       if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) {
+       if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
                zlog_debug("%u:%s Processing dplane ctx %p, op %s result %s",
                           dplane_ctx_get_vrf(ctx), dest_str, ctx,
                           dplane_op2str(op), dplane_res2str(status));
-       }
 
        if (op == DPLANE_OP_ROUTE_DELETE) {
                /*
@@ -3289,7 +3288,7 @@ static int rib_process_dplane_results(struct thread *thread)
  * the dataplane pthread. We enqueue the results here for processing by
  * the main thread later.
  */
-static int rib_dplane_results(const struct zebra_dplane_ctx *ctx)
+static int rib_dplane_results(struct zebra_dplane_ctx *ctx)
 {
        /* Take lock controlling queue of results */
        pthread_mutex_lock(&dplane_mutex);