From c831033fff8775579998463e372d7653e053e6d8 Mon Sep 17 00:00:00 2001 From: Mark Stapp Date: Fri, 31 Aug 2018 13:46:50 -0400 Subject: [PATCH] zebra: dataplane provider enhancements Limit the number of updates processed from the incoming queue; add more stats. Fill out apis for dataplane providers; convert route update processing to provider model; move dataplane status enum Signed-off-by: Mark Stapp --- zebra/zebra_dplane.c | 732 ++++++++++++++++++++++++++++++++++++------- zebra/zebra_dplane.h | 95 +++++- zebra/zebra_rib.c | 5 +- 3 files changed, 695 insertions(+), 137 deletions(-) diff --git a/zebra/zebra_dplane.c b/zebra/zebra_dplane.c index 6cd52f7bb8..c859bdaf6e 100644 --- a/zebra/zebra_dplane.c +++ b/zebra/zebra_dplane.c @@ -42,6 +42,9 @@ DEFINE_MTYPE(ZEBRA, DP_PROV, "Zebra DPlane Provider") const uint32_t DPLANE_DEFAULT_MAX_QUEUED = 200; +/* Default value for new work per cycle */ +const uint32_t DPLANE_DEFAULT_NEW_WORK = 100; + /* Validation check macro for context blocks */ /* #define DPLANE_DEBUG 1 */ @@ -69,6 +72,12 @@ struct zebra_dplane_ctx { /* Status on return */ enum zebra_dplane_result zd_status; + /* Dplane provider id */ + uint32_t zd_provider; + + /* Flags - used by providers, e.g. */ + int zd_flags; + /* TODO -- internal/sub-operation status? */ enum zebra_dplane_result zd_remote_status; enum zebra_dplane_result zd_kernel_status; @@ -118,6 +127,12 @@ struct zebra_dplane_ctx { TAILQ_ENTRY(zebra_dplane_ctx) zd_q_entries; }; +/* Flag that can be set by a pre-kernel provider as a signal that an update + * should bypass the kernel. + */ +#define DPLANE_CTX_FLAG_NO_KERNEL 0x01 + + /* * Registration block for one dataplane provider. */ @@ -131,16 +146,35 @@ struct zebra_dplane_provider { /* Id value */ uint32_t dp_id; + /* Mutex */ + pthread_mutex_t dp_mutex; + + /* Plugin-provided extra data */ + void *dp_data; + + /* Flags */ + int dp_flags; + dplane_provider_process_fp dp_fp; dplane_provider_fini_fp dp_fini; _Atomic uint32_t dp_in_counter; + _Atomic uint32_t dp_in_max; + _Atomic uint32_t dp_out_counter; + _Atomic uint32_t dp_out_max; _Atomic uint32_t dp_error_counter; - /* Embedded list linkage */ - TAILQ_ENTRY(zebra_dplane_provider) dp_q_providers; + /* Queue of contexts inbound to the provider */ + struct dplane_ctx_q dp_ctx_in_q; + + /* Queue of completed contexts outbound from the provider back + * towards the dataplane module. + */ + struct dplane_ctx_q dp_ctx_out_q; + /* Embedded list linkage for provider objects */ + TAILQ_ENTRY(zebra_dplane_provider) dp_prov_link; }; /* @@ -171,10 +205,16 @@ static struct zebra_dplane_globals { /* Limit number of pending, unprocessed updates */ _Atomic uint32_t dg_max_queued_updates; + /* Limit number of new updates dequeued at once, to pace an + * incoming burst. + */ + uint32_t dg_updates_per_cycle; + _Atomic uint32_t dg_routes_in; _Atomic uint32_t dg_routes_queued; _Atomic uint32_t dg_routes_queued_max; _Atomic uint32_t dg_route_errors; + _Atomic uint32_t dg_update_yields; /* Dataplane pthread */ struct frr_pthread *dg_pthread; @@ -191,14 +231,20 @@ static struct zebra_dplane_globals { } zdplane_info; /* - * Lock and unlock for interactions with the zebra 'core' + * Lock and unlock for interactions with the zebra 'core' pthread */ #define DPLANE_LOCK() pthread_mutex_lock(&zdplane_info.dg_mutex) - #define DPLANE_UNLOCK() pthread_mutex_unlock(&zdplane_info.dg_mutex) + +/* + * Lock and unlock for individual providers + */ +#define DPLANE_PROV_LOCK(p) pthread_mutex_lock(&((p)->dp_mutex)) +#define DPLANE_PROV_UNLOCK(p) pthread_mutex_unlock(&((p)->dp_mutex)) + /* Prototypes */ -static int dplane_route_process(struct thread *event); +static int dplane_thread_loop(struct thread *event); /* * Public APIs @@ -285,6 +331,38 @@ enum zebra_dplane_result dplane_ctx_get_status( return ctx->zd_status; } +void dplane_ctx_set_status(struct zebra_dplane_ctx *ctx, + enum zebra_dplane_result status) +{ + DPLANE_CTX_VALID(ctx); + + ctx->zd_status = status; +} + +/* Retrieve last/current provider id */ +uint32_t dplane_ctx_get_provider(const struct zebra_dplane_ctx *ctx) +{ + DPLANE_CTX_VALID(ctx); + return ctx->zd_provider; +} + +/* Providers run before the kernel can control whether a kernel + * update should be done. + */ +void dplane_ctx_set_skip_kernel(struct zebra_dplane_ctx *ctx) +{ + DPLANE_CTX_VALID(ctx); + + SET_FLAG(ctx->zd_flags, DPLANE_CTX_FLAG_NO_KERNEL); +} + +bool dplane_ctx_is_skip_kernel(const struct zebra_dplane_ctx *ctx) +{ + DPLANE_CTX_VALID(ctx); + + return CHECK_FLAG(ctx->zd_flags, DPLANE_CTX_FLAG_NO_KERNEL); +} + enum dplane_op_e dplane_ctx_get_op(const struct zebra_dplane_ctx *ctx) { DPLANE_CTX_VALID(ctx); @@ -517,6 +595,7 @@ const struct zebra_dplane_info *dplane_ctx_get_ns( * End of dplane context accessors */ + /* * Retrieve the limit on the number of pending, unprocessed updates. */ @@ -678,34 +757,11 @@ static int dplane_route_enqueue(struct zebra_dplane_ctx *ctx) } /* Ensure that an event for the dataplane thread is active */ - thread_add_event(zdplane_info.dg_master, dplane_route_process, NULL, 0, - &zdplane_info.dg_t_update); - - ret = AOK; + ret = dplane_provider_work_ready(); return ret; } -/* - * Attempt to dequeue a route-update block - */ -static struct zebra_dplane_ctx *dplane_route_dequeue(void) -{ - struct zebra_dplane_ctx *ctx = NULL; - - DPLANE_LOCK(); - { - ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q); - if (ctx) { - TAILQ_REMOVE(&zdplane_info.dg_route_ctx_q, - ctx, zd_q_entries); - } - } - DPLANE_UNLOCK(); - - return ctx; -} - /* * Utility that prepares a route update and enqueues it for processing */ @@ -828,68 +884,15 @@ done: return ret; } -/* - * Event handler function for routing updates - */ -static int dplane_route_process(struct thread *event) -{ - enum zebra_dplane_result res; - struct zebra_dplane_ctx *ctx; - - while (1) { - /* Check for shutdown */ - if (!zdplane_info.dg_run) - break; - - /* TODO -- limit number of updates per cycle? */ - ctx = dplane_route_dequeue(); - if (ctx == NULL) - break; - - /* Update counter */ - atomic_fetch_sub_explicit(&zdplane_info.dg_routes_queued, 1, - memory_order_relaxed); - - if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) { - char dest_str[PREFIX_STRLEN]; - - prefix2str(dplane_ctx_get_dest(ctx), - dest_str, sizeof(dest_str)); - - zlog_debug("%u:%s Dplane route update ctx %p op %s", - dplane_ctx_get_vrf(ctx), dest_str, - ctx, dplane_op2str(dplane_ctx_get_op(ctx))); - } - - /* TODO -- support series of providers */ - - /* Initially, just doing kernel-facing update here */ - res = kernel_route_update(ctx); - - if (res != ZEBRA_DPLANE_REQUEST_SUCCESS) - atomic_fetch_add_explicit(&zdplane_info.dg_route_errors, - 1, memory_order_relaxed); - - ctx->zd_status = res; - - /* Enqueue result to zebra main context */ - zdplane_info.dg_results_cb(ctx); - - ctx = NULL; - } - - return 0; -} - /* * Handler for 'show dplane' */ int dplane_show_helper(struct vty *vty, bool detailed) { - uint64_t queued, limit, queue_max, errs, incoming; + uint64_t queued, queue_max, limit, errs, incoming, yields; /* Using atomics because counters are being changed in different - * contexts. + * pthread contexts. */ incoming = atomic_load_explicit(&zdplane_info.dg_routes_in, memory_order_relaxed); @@ -901,12 +904,16 @@ int dplane_show_helper(struct vty *vty, bool detailed) memory_order_relaxed); errs = atomic_load_explicit(&zdplane_info.dg_route_errors, memory_order_relaxed); + yields = atomic_load_explicit(&zdplane_info.dg_update_yields, + memory_order_relaxed); - vty_out(vty, "Route updates: %"PRIu64"\n", incoming); + vty_out(vty, "Zebra dataplane:\nRoute updates: %"PRIu64"\n", + incoming); vty_out(vty, "Route update errors: %"PRIu64"\n", errs); vty_out(vty, "Route update queue limit: %"PRIu64"\n", limit); vty_out(vty, "Route update queue depth: %"PRIu64"\n", queued); vty_out(vty, "Route update queue max: %"PRIu64"\n", queue_max); + vty_out(vty, "Route update yields: %"PRIu64"\n", yields); return CMD_SUCCESS; } @@ -916,8 +923,35 @@ int dplane_show_helper(struct vty *vty, bool detailed) */ int dplane_show_provs_helper(struct vty *vty, bool detailed) { - vty_out(vty, "Zebra dataplane providers:%s\n", - (detailed ? " (detailed)" : "")); + struct zebra_dplane_provider *prov; + uint64_t in, in_max, out, out_max; + + vty_out(vty, "Zebra dataplane providers:\n"); + + DPLANE_LOCK(); + prov = TAILQ_FIRST(&zdplane_info.dg_providers_q); + DPLANE_UNLOCK(); + + /* Show counters, useful info from each registered provider */ + while (prov) { + + in = atomic_load_explicit(&prov->dp_in_counter, + memory_order_relaxed); + in_max = atomic_load_explicit(&prov->dp_in_max, + memory_order_relaxed); + out = atomic_load_explicit(&prov->dp_out_counter, + memory_order_relaxed); + out_max = atomic_load_explicit(&prov->dp_out_max, + memory_order_relaxed); + + vty_out(vty, "%s (%u): in: %"PRIu64", max: %"PRIu64", " + "out: %"PRIu64", max: %"PRIu64"\n", + prov->dp_name, prov->dp_id, in, in_max, out, out_max); + + DPLANE_LOCK(); + prov = TAILQ_NEXT(prov, dp_prov_link); + DPLANE_UNLOCK(); + } return CMD_SUCCESS; } @@ -926,9 +960,11 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed) * Provider registration */ int dplane_provider_register(const char *name, - enum dplane_provider_prio_e prio, + enum dplane_provider_prio prio, + int flags, dplane_provider_process_fp fp, - dplane_provider_fini_fp fini_fp) + dplane_provider_fini_fp fini_fp, + void *data) { int ret = 0; struct zebra_dplane_provider *p, *last; @@ -952,37 +988,160 @@ int dplane_provider_register(const char *name, goto done; } - strncpy(p->dp_name, name, DPLANE_PROVIDER_NAMELEN); - p->dp_name[DPLANE_PROVIDER_NAMELEN] = '\0'; /* Belt-and-suspenders */ + pthread_mutex_init(&(p->dp_mutex), NULL); + TAILQ_INIT(&(p->dp_ctx_in_q)); + TAILQ_INIT(&(p->dp_ctx_out_q)); p->dp_priority = prio; p->dp_fp = fp; p->dp_fini = fini_fp; + p->dp_data = data; - /* Lock the lock - the dplane pthread may be running */ + /* Lock - the dplane pthread may be running */ DPLANE_LOCK(); p->dp_id = ++zdplane_info.dg_provider_id; + if (name) + strlcpy(p->dp_name, name, DPLANE_PROVIDER_NAMELEN); + else + snprintf(p->dp_name, DPLANE_PROVIDER_NAMELEN, + "provider-%u", p->dp_id); + /* Insert into list ordered by priority */ - TAILQ_FOREACH(last, &zdplane_info.dg_providers_q, dp_q_providers) { + TAILQ_FOREACH(last, &zdplane_info.dg_providers_q, dp_prov_link) { if (last->dp_priority > p->dp_priority) break; } if (last) - TAILQ_INSERT_BEFORE(last, p, dp_q_providers); + TAILQ_INSERT_BEFORE(last, p, dp_prov_link); else TAILQ_INSERT_TAIL(&zdplane_info.dg_providers_q, p, - dp_q_providers); + dp_prov_link); /* And unlock */ DPLANE_UNLOCK(); + if (IS_ZEBRA_DEBUG_DPLANE) + zlog_debug("dplane: registered new provider '%s' (%u), prio %d", + p->dp_name, p->dp_id, p->dp_priority); + done: return ret; } +/* Accessors for provider attributes */ +const char *dplane_provider_get_name(const struct zebra_dplane_provider *prov) +{ + return prov->dp_name; +} + +uint32_t dplane_provider_get_id(const struct zebra_dplane_provider *prov) +{ + return prov->dp_id; +} + +void *dplane_provider_get_data(const struct zebra_dplane_provider *prov) +{ + return prov->dp_data; +} + +int dplane_provider_get_work_limit(const struct zebra_dplane_provider *prov) +{ + return zdplane_info.dg_updates_per_cycle; +} + +/* + * Dequeue and maintain associated counter + */ +struct zebra_dplane_ctx *dplane_provider_dequeue_in_ctx( + struct zebra_dplane_provider *prov) +{ + struct zebra_dplane_ctx *ctx = NULL; + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_LOCK(prov); + + ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q)); + if (ctx) { + TAILQ_REMOVE(&(prov->dp_ctx_in_q), ctx, zd_q_entries); + } + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_UNLOCK(prov); + + return ctx; +} + +/* + * Dequeue work to a list, return count + */ +int dplane_provider_dequeue_in_list(struct zebra_dplane_provider *prov, + struct dplane_ctx_q *listp) +{ + int limit, ret; + struct zebra_dplane_ctx *ctx; + + limit = zdplane_info.dg_updates_per_cycle; + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_LOCK(prov); + + for (ret = 0; ret < limit; ret++) { + ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q)); + if (ctx) { + TAILQ_REMOVE(&(prov->dp_ctx_in_q), ctx, zd_q_entries); + + TAILQ_INSERT_TAIL(listp, ctx, zd_q_entries); + } else { + break; + } + } + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_UNLOCK(prov); + + return ret; +} + +/* + * Enqueue and maintain associated counter + */ +void dplane_provider_enqueue_out_ctx(struct zebra_dplane_provider *prov, + struct zebra_dplane_ctx *ctx) +{ + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_LOCK(prov); + + TAILQ_INSERT_TAIL(&(prov->dp_ctx_out_q), ctx, + zd_q_entries); + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_UNLOCK(prov); + + atomic_fetch_add_explicit(&(prov->dp_out_counter), 1, + memory_order_relaxed); +} + +bool dplane_provider_is_threaded(const struct zebra_dplane_provider *prov) +{ + return (prov->dp_flags & DPLANE_PROV_FLAG_THREADED); +} + +/* + * Provider api to signal that work/events are available + * for the dataplane pthread. + */ +int dplane_provider_work_ready(void) +{ + thread_add_event(zdplane_info.dg_master, + dplane_thread_loop, NULL, 0, + &zdplane_info.dg_t_update); + + return AOK; +} + /* * Zebra registers a results callback with the dataplane system */ @@ -993,37 +1152,154 @@ int dplane_results_register(dplane_results_fp fp) } /* - * Initialize the dataplane module during startup, internal/private version + * Kernel dataplane provider */ -static void zebra_dplane_init_internal(struct zebra_t *zebra) + +/* + * Kernel provider callback + */ +static int kernel_dplane_process_func(struct zebra_dplane_provider *prov) { - memset(&zdplane_info, 0, sizeof(zdplane_info)); + enum zebra_dplane_result res; + struct zebra_dplane_ctx *ctx; + int counter, limit; - pthread_mutex_init(&zdplane_info.dg_mutex, NULL); + limit = dplane_provider_get_work_limit(prov); - TAILQ_INIT(&zdplane_info.dg_route_ctx_q); - TAILQ_INIT(&zdplane_info.dg_providers_q); + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s': processing", + dplane_provider_get_name(prov)); - zdplane_info.dg_max_queued_updates = DPLANE_DEFAULT_MAX_QUEUED; + for (counter = 0; counter < limit; counter++) { - /* TODO -- register default kernel 'provider' during init */ - zdplane_info.dg_run = true; + ctx = dplane_provider_dequeue_in_ctx(prov); + if (ctx == NULL) + break; - /* Start dataplane pthread */ + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) { + char dest_str[PREFIX_STRLEN]; - zdplane_info.dg_run = true; + prefix2str(dplane_ctx_get_dest(ctx), + dest_str, sizeof(dest_str)); - struct frr_pthread_attr pattr = { - .start = frr_pthread_attr_default.start, - .stop = frr_pthread_attr_default.stop - }; + zlog_debug("%u:%s Dplane route update ctx %p op %s", + dplane_ctx_get_vrf(ctx), dest_str, + ctx, dplane_op2str(dplane_ctx_get_op(ctx))); + } - zdplane_info.dg_pthread = frr_pthread_new(&pattr, "Zebra dplane thread", - "Zebra dplane"); + /* Call into the synchronous kernel-facing code here */ + res = kernel_route_update(ctx); - zdplane_info.dg_master = zdplane_info.dg_pthread->master; + if (res != ZEBRA_DPLANE_REQUEST_SUCCESS) + atomic_fetch_add_explicit( + &zdplane_info.dg_route_errors, 1, + memory_order_relaxed); - frr_pthread_run(zdplane_info.dg_pthread, NULL); + dplane_ctx_set_status(ctx, res); + + dplane_provider_enqueue_out_ctx(prov, ctx); + } + + /* Ensure that we'll run the work loop again if there's still + * more work to do. + */ + if (counter >= limit) { + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s' reached max updates %d", + dplane_provider_get_name(prov), counter); + + atomic_fetch_add_explicit(&zdplane_info.dg_update_yields, + 1, memory_order_relaxed); + + dplane_provider_work_ready(); + } + + return 0; +} + +/* + * Test dataplane provider plugin + */ + +/* + * Test provider process callback + */ +static int test_dplane_process_func(struct zebra_dplane_provider *prov) +{ + struct zebra_dplane_ctx *ctx; + int counter, limit; + + /* Just moving from 'in' queue to 'out' queue */ + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s': processing", + dplane_provider_get_name(prov)); + + limit = dplane_provider_get_work_limit(prov); + + for (counter = 0; counter < limit; counter++) { + + ctx = dplane_provider_dequeue_in_ctx(prov); + if (ctx == NULL) + break; + + dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS); + + dplane_provider_enqueue_out_ctx(prov, ctx); + } + + /* Ensure that we'll run the work loop again if there's still + * more work to do. + */ + if (counter >= limit) + dplane_provider_work_ready(); + + return 0; +} + +/* + * Test provider shutdown/fini callback + */ +static int test_dplane_shutdown_func(struct zebra_dplane_provider *prov, + bool early) +{ + if (IS_ZEBRA_DEBUG_DPLANE) + zlog_debug("dplane provider '%s': %sshutdown", + dplane_provider_get_name(prov), + early ? "early " : ""); + + return 0; +} + +/* + * Register default kernel provider + */ +static void dplane_provider_init(void) +{ + int ret; + + ret = dplane_provider_register("Kernel", + DPLANE_PRIO_KERNEL, + DPLANE_PROV_FLAGS_DEFAULT, + kernel_dplane_process_func, + NULL, + NULL); + + if (ret != AOK) + zlog_err("Unable to register kernel dplane provider: %d", + ret); + + /* TODO -- make the test provider optional... */ + ret = dplane_provider_register("Test", + DPLANE_PRIO_PRE_KERNEL, + DPLANE_PROV_FLAGS_DEFAULT, + test_dplane_process_func, + test_dplane_shutdown_func, + NULL /* data */); + + if (ret != AOK) + zlog_err("Unable to register test dplane provider: %d", + ret); } /* Indicates zebra shutdown/exit is in progress. Some operations may be @@ -1059,7 +1335,9 @@ static bool dplane_work_pending(void) { struct zebra_dplane_ctx *ctx; - /* TODO -- just checking incoming/pending work for now */ + /* TODO -- just checking incoming/pending work for now, must check + * providers + */ DPLANE_LOCK(); { ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q); @@ -1120,6 +1398,184 @@ void zebra_dplane_finish(void) &zdplane_info.dg_t_shutdown_check); } +/* + * Main dataplane pthread event loop. The thread takes new incoming work + * and offers it to the first provider. It then iterates through the + * providers, taking complete work from each one and offering it + * to the next in order. At each step, a limited number of updates are + * processed during a cycle in order to provide some fairness. + */ +static int dplane_thread_loop(struct thread *event) +{ + struct dplane_ctx_q work_list; + struct dplane_ctx_q error_list; + struct zebra_dplane_provider *prov; + struct zebra_dplane_ctx *ctx, *tctx; + int limit, counter, error_counter; + uint64_t lval; + + /* Capture work limit per cycle */ + limit = zdplane_info.dg_updates_per_cycle; + + TAILQ_INIT(&work_list); + + /* Check for zebra shutdown */ + if (!zdplane_info.dg_run) + goto done; + + /* Dequeue some incoming work from zebra (if any) onto the temporary + * working list. + */ + DPLANE_LOCK(); + + /* Locate initial registered provider */ + prov = TAILQ_FIRST(&zdplane_info.dg_providers_q); + + for (counter = 0; counter < limit; counter++) { + ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q); + if (ctx) { + TAILQ_REMOVE(&zdplane_info.dg_route_ctx_q, ctx, + zd_q_entries); + + atomic_fetch_sub_explicit( + &zdplane_info.dg_routes_queued, 1, + memory_order_relaxed); + + ctx->zd_provider = prov->dp_id; + + TAILQ_INSERT_TAIL(&work_list, ctx, zd_q_entries); + } else { + break; + } + } + + DPLANE_UNLOCK(); + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane: incoming new work counter: %d", counter); + + /* Iterate through the registered providers, offering new incoming + * work. If the provider has outgoing work in its queue, take that + * work for the next provider + */ + while (prov) { + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane enqueues %d new work to provider '%s'", + counter, dplane_provider_get_name(prov)); + + /* Capture current provider id in each context; check for + * error status. + */ + TAILQ_FOREACH_SAFE(ctx, &work_list, zd_q_entries, tctx) { + if (dplane_ctx_get_status(ctx) == + ZEBRA_DPLANE_REQUEST_SUCCESS) { + ctx->zd_provider = prov->dp_id; + } else { + /* + * TODO -- improve error-handling: recirc + * errors backwards so that providers can + * 'undo' their work (if they want to) + */ + + /* Move to error list; will be returned + * zebra main. + */ + TAILQ_REMOVE(&work_list, ctx, zd_q_entries); + TAILQ_INSERT_TAIL(&error_list, + ctx, zd_q_entries); + error_counter++; + } + } + + /* Enqueue new work to the provider */ + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_LOCK(prov); + + if (TAILQ_FIRST(&work_list)) + TAILQ_CONCAT(&(prov->dp_ctx_in_q), &work_list, + zd_q_entries); + + lval = atomic_add_fetch_explicit(&prov->dp_in_counter, counter, + memory_order_relaxed); + if (lval > prov->dp_in_max) + atomic_store_explicit(&prov->dp_in_max, lval, + memory_order_relaxed); + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_UNLOCK(prov); + + /* Reset the temp list */ + TAILQ_INIT(&work_list); + counter = 0; + + /* Call into the provider code */ + (*prov->dp_fp)(prov); + + /* Check for zebra shutdown */ + if (!zdplane_info.dg_run) + break; + + /* Dequeue completed work from the provider */ + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_LOCK(prov); + + while (counter < limit) { + ctx = TAILQ_FIRST(&(prov->dp_ctx_out_q)); + if (ctx) { + TAILQ_REMOVE(&(prov->dp_ctx_out_q), ctx, + zd_q_entries); + + TAILQ_INSERT_TAIL(&work_list, + ctx, zd_q_entries); + counter++; + } else + break; + } + + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_UNLOCK(prov); + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane dequeues %d completed work from provider %s", + counter, dplane_provider_get_name(prov)); + + /* Locate next provider */ + DPLANE_LOCK(); + prov = TAILQ_NEXT(prov, dp_prov_link); + DPLANE_UNLOCK(); + + if (prov) { + TAILQ_FOREACH(ctx, &work_list, zd_q_entries) + ctx->zd_provider = prov->dp_id; + } + } + + /* After all providers have been serviced, enqueue any completed + * work back to zebra so it can process the results. + */ + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane has %d completed work for zebra main", + counter); + + /* + * TODO -- I'd rather hand lists through the api to zebra main, + * to reduce the number of lock/unlock cycles + */ + for (ctx = TAILQ_FIRST(&work_list); ctx; ) { + TAILQ_REMOVE(&work_list, ctx, zd_q_entries); + + /* Call through to zebra main */ + (*zdplane_info.dg_results_cb)(ctx); + + ctx = TAILQ_FIRST(&work_list); + } + +done: + return 0; +} + /* * Final phase of shutdown, after all work enqueued to dplane has been * processed. This is called from the zebra main pthread context. @@ -1142,11 +1598,51 @@ void zebra_dplane_shutdown(void) zdplane_info.dg_pthread = NULL; zdplane_info.dg_master = NULL; - /* Notify provider(s) of final shutdown */ + /* TODO -- Notify provider(s) of final shutdown */ + + /* TODO -- Clean-up provider objects */ + + /* TODO -- Clean queue(s), free memory */ +} + +/* + * Initialize the dataplane module during startup, internal/private version + */ +static void zebra_dplane_init_internal(struct zebra_t *zebra) +{ + memset(&zdplane_info, 0, sizeof(zdplane_info)); + + pthread_mutex_init(&zdplane_info.dg_mutex, NULL); - /* Clean-up provider objects */ + TAILQ_INIT(&zdplane_info.dg_route_ctx_q); + TAILQ_INIT(&zdplane_info.dg_providers_q); - /* Clean queue(s) */ + zdplane_info.dg_updates_per_cycle = DPLANE_DEFAULT_NEW_WORK; + + zdplane_info.dg_max_queued_updates = DPLANE_DEFAULT_MAX_QUEUED; + + /* Register default kernel 'provider' during init */ + dplane_provider_init(); + + /* Start dataplane pthread */ + + zdplane_info.dg_run = true; + + struct frr_pthread_attr pattr = { + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop + }; + + zdplane_info.dg_pthread = frr_pthread_new(&pattr, "Zebra dplane thread", + "Zebra dplane"); + + zdplane_info.dg_master = zdplane_info.dg_pthread->master; + + /* Enqueue an initial event for the dataplane pthread */ + thread_add_event(zdplane_info.dg_master, dplane_thread_loop, NULL, 0, + &zdplane_info.dg_t_update); + + frr_pthread_run(zdplane_info.dg_pthread, NULL); } /* diff --git a/zebra/zebra_dplane.h b/zebra/zebra_dplane.h index 999e0f39e4..844fd59600 100644 --- a/zebra/zebra_dplane.h +++ b/zebra/zebra_dplane.h @@ -29,7 +29,6 @@ #include "zebra/rib.h" #include "zebra/zserv.h" - /* Key netlink info from zebra ns */ struct zebra_dplane_info { ns_id_t ns_id; @@ -127,6 +126,12 @@ void dplane_ctx_fini(struct zebra_dplane_ctx **pctx); void dplane_ctx_enqueue_tail(struct dplane_ctx_q *q, const struct zebra_dplane_ctx *ctx); +/* Append a list of context blocks to another list - again, just keeping + * the context struct opaque. + */ +void dplane_ctx_list_append(struct dplane_ctx_q *to_list, + struct dplane_ctx_q *from_list); + /* Dequeue a context block from the head of caller's tailq */ void dplane_ctx_dequeue(struct dplane_ctx_q *q, struct zebra_dplane_ctx **ctxp); @@ -135,6 +140,8 @@ void dplane_ctx_dequeue(struct dplane_ctx_q *q, struct zebra_dplane_ctx **ctxp); */ enum zebra_dplane_result dplane_ctx_get_status( const struct zebra_dplane_ctx *ctx); +void dplane_ctx_set_status(struct zebra_dplane_ctx *ctx, + enum zebra_dplane_result status); const char *dplane_res2str(enum zebra_dplane_result res); enum dplane_op_e dplane_ctx_get_op(const struct zebra_dplane_ctx *ctx); @@ -142,6 +149,15 @@ const char *dplane_op2str(enum dplane_op_e op); const struct prefix *dplane_ctx_get_dest(const struct zebra_dplane_ctx *ctx); +/* Retrieve last/current provider id */ +uint32_t dplane_ctx_get_provider(const struct zebra_dplane_ctx *ctx); + +/* Providers running before the kernel can control whether a kernel + * update should be done. + */ +void dplane_ctx_set_skip_kernel(struct zebra_dplane_ctx *ctx); +bool dplane_ctx_is_skip_kernel(const struct zebra_dplane_ctx *ctx); + /* Source prefix is a little special - use convention to return NULL * to mean "no src prefix" */ @@ -212,9 +228,11 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed); /* - * Dataplane providers: modules that consume dataplane events. + * Dataplane providers: modules that process or consume dataplane events. */ +struct zebra_dplane_provider; + /* Support string name for a dataplane provider */ #define DPLANE_PROVIDER_NAMELEN 64 @@ -223,7 +241,7 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed); * followed by the kernel, followed by some post-processing step (such as * the fpm output stream.) */ -enum dplane_provider_prio_e { +enum dplane_provider_prio { DPLANE_PRIO_NONE = 0, DPLANE_PRIO_PREPROCESS, DPLANE_PRIO_PRE_KERNEL, @@ -232,28 +250,72 @@ enum dplane_provider_prio_e { DPLANE_PRIO_LAST }; -/* Provider's entry-point to process a context block */ -typedef int (*dplane_provider_process_fp)(struct zebra_dplane_ctx *ctx); +/* Provider's entry-point for incoming work, called in the context of the + * dataplane pthread. The dataplane pthread enqueues any new work to the + * provider's 'inbound' queue, then calls the callback. The dataplane + * then checks the provider's outbound queue. + */ +typedef int (*dplane_provider_process_fp)(struct zebra_dplane_provider *prov); + +/* Provider's entry-point for shutdown and cleanup. Called with 'early' + * during shutdown, to indicate that the dataplane subsystem is allowing + * work to move through the providers and finish. When called without 'early', + * the provider should release all resources (if it has any allocated). + */ +typedef int (*dplane_provider_fini_fp)(struct zebra_dplane_provider *prov, + bool early); + +/* Flags values used during provider registration. */ +#define DPLANE_PROV_FLAGS_DEFAULT 0x0 + +/* Provider will be spawning its own worker thread */ +#define DPLANE_PROV_FLAG_THREADED 0x1 -/* Provider's entry-point for shutdown and cleanup */ -typedef int (*dplane_provider_fini_fp)(void); -/* Provider registration */ +/* Provider registration: ordering or priority value, callbacks, and optional + * opaque data value. + */ int dplane_provider_register(const char *name, - enum dplane_provider_prio_e prio, + enum dplane_provider_prio prio, + int flags, dplane_provider_process_fp fp, - dplane_provider_fini_fp fini_fp); + dplane_provider_fini_fp fini_fp, + void *data); -/* - * Results are returned to zebra core via a callback +/* Accessors for provider attributes */ +const char *dplane_provider_get_name(const struct zebra_dplane_provider *prov); +uint32_t dplane_provider_get_id(const struct zebra_dplane_provider *prov); +void *dplane_provider_get_data(const struct zebra_dplane_provider *prov); +bool dplane_provider_is_threaded(const struct zebra_dplane_provider *prov); + +/* Providers should limit number of updates per work cycle */ +int dplane_provider_get_work_limit(const struct zebra_dplane_provider *prov); + +/* Provider api to signal that work/events are available + * for the dataplane pthread. */ -typedef int (*dplane_results_fp)(const struct zebra_dplane_ctx *ctx); +int dplane_provider_work_ready(void); + +/* Dequeue, maintain associated counter and locking */ +struct zebra_dplane_ctx *dplane_provider_dequeue_in_ctx( + struct zebra_dplane_provider *prov); + +/* Dequeue work to a list, maintain counter and locking, return count */ +int dplane_provider_dequeue_in_list(struct zebra_dplane_provider *prov, + struct dplane_ctx_q *listp); + +/* Enqueue, maintain associated counter and locking */ +void dplane_provider_enqueue_out_ctx(struct zebra_dplane_provider *prov, + struct zebra_dplane_ctx *ctx); /* * Zebra registers a results callback with the dataplane. The callback is - * called in the dataplane thread context, so the expectation is that the - * context is queued (or that processing is very limited). + * called in the dataplane pthread context, so the expectation is that the + * context is queued for the zebra main pthread or that processing + * is very limited. */ +typedef int (*dplane_results_fp)(struct zebra_dplane_ctx *ctx); + int dplane_results_register(dplane_results_fp fp); /* @@ -264,7 +326,8 @@ void zebra_dplane_init(void); /* Finalize/cleanup apis, one called early as shutdown is starting, * one called late at the end of zebra shutdown, and then one called - * from the zebra main thread to stop the dplane thread free all resources. + * from the zebra main pthread to stop the dplane pthread and + * free all resources. * * Zebra expects to try to clean up all vrfs and all routes during * shutdown, so the dplane must be available until very late. diff --git a/zebra/zebra_rib.c b/zebra/zebra_rib.c index 8285392527..f161d5875b 100644 --- a/zebra/zebra_rib.c +++ b/zebra/zebra_rib.c @@ -1932,11 +1932,10 @@ static void rib_process_after(struct zebra_dplane_ctx *ctx) op = dplane_ctx_get_op(ctx); status = dplane_ctx_get_status(ctx); - if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) { + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) zlog_debug("%u:%s Processing dplane ctx %p, op %s result %s", dplane_ctx_get_vrf(ctx), dest_str, ctx, dplane_op2str(op), dplane_res2str(status)); - } if (op == DPLANE_OP_ROUTE_DELETE) { /* @@ -3289,7 +3288,7 @@ static int rib_process_dplane_results(struct thread *thread) * the dataplane pthread. We enqueue the results here for processing by * the main thread later. */ -static int rib_dplane_results(const struct zebra_dplane_ctx *ctx) +static int rib_dplane_results(struct zebra_dplane_ctx *ctx) { /* Take lock controlling queue of results */ pthread_mutex_lock(&dplane_mutex); -- 2.39.5