diff options
Diffstat (limited to 'zebra/zebra_dplane.c')
| -rw-r--r-- | zebra/zebra_dplane.c | 895 |
1 files changed, 771 insertions, 124 deletions
diff --git a/zebra/zebra_dplane.c b/zebra/zebra_dplane.c index 3e61418b64..ba0f1b41aa 100644 --- a/zebra/zebra_dplane.c +++ b/zebra/zebra_dplane.c @@ -38,9 +38,14 @@ DEFINE_MTYPE(ZEBRA, DP_PROV, "Zebra DPlane Provider") # define AOK 0 #endif +/* Enable test dataplane provider */ +/*#define DPLANE_TEST_PROVIDER 1 */ + /* Default value for max queued incoming updates */ const uint32_t DPLANE_DEFAULT_MAX_QUEUED = 200; +/* Default value for new work per cycle */ +const uint32_t DPLANE_DEFAULT_NEW_WORK = 100; /* Validation check macro for context blocks */ /* #define DPLANE_DEBUG 1 */ @@ -69,6 +74,12 @@ struct zebra_dplane_ctx { /* Status on return */ enum zebra_dplane_result zd_status; + /* Dplane provider id */ + uint32_t zd_provider; + + /* Flags - used by providers, e.g. */ + int zd_flags; + /* TODO -- internal/sub-operation status? */ enum zebra_dplane_result zd_remote_status; enum zebra_dplane_result zd_kernel_status; @@ -118,6 +129,12 @@ struct zebra_dplane_ctx { TAILQ_ENTRY(zebra_dplane_ctx) zd_q_entries; }; +/* Flag that can be set by a pre-kernel provider as a signal that an update + * should bypass the kernel. + */ +#define DPLANE_CTX_FLAG_NO_KERNEL 0x01 + + /* * Registration block for one dataplane provider. */ @@ -131,16 +148,37 @@ struct zebra_dplane_provider { /* Id value */ uint32_t dp_id; + /* Mutex */ + pthread_mutex_t dp_mutex; + + /* Plugin-provided extra data */ + void *dp_data; + + /* Flags */ + int dp_flags; + dplane_provider_process_fp dp_fp; dplane_provider_fini_fp dp_fini; _Atomic uint32_t dp_in_counter; + _Atomic uint32_t dp_in_queued; + _Atomic uint32_t dp_in_max; + _Atomic uint32_t dp_out_counter; + _Atomic uint32_t dp_out_queued; + _Atomic uint32_t dp_out_max; _Atomic uint32_t dp_error_counter; - /* Embedded list linkage */ - TAILQ_ENTRY(zebra_dplane_provider) dp_q_providers; + /* Queue of contexts inbound to the provider */ + struct dplane_ctx_q dp_ctx_in_q; + /* Queue of completed contexts outbound from the provider back + * towards the dataplane module. + */ + struct dplane_ctx_q dp_ctx_out_q; + + /* Embedded list linkage for provider objects */ + TAILQ_ENTRY(zebra_dplane_provider) dp_prov_link; }; /* @@ -171,10 +209,19 @@ static struct zebra_dplane_globals { /* Limit number of pending, unprocessed updates */ _Atomic uint32_t dg_max_queued_updates; + /* Limit number of new updates dequeued at once, to pace an + * incoming burst. + */ + uint32_t dg_updates_per_cycle; + _Atomic uint32_t dg_routes_in; _Atomic uint32_t dg_routes_queued; _Atomic uint32_t dg_routes_queued_max; _Atomic uint32_t dg_route_errors; + _Atomic uint32_t dg_update_yields; + + /* Dataplane pthread */ + struct frr_pthread *dg_pthread; /* Event-delivery context 'master' for the dplane */ struct thread_master *dg_master; @@ -188,19 +235,33 @@ static struct zebra_dplane_globals { } zdplane_info; /* - * Lock and unlock for interactions with the zebra 'core' + * Lock and unlock for interactions with the zebra 'core' pthread */ #define DPLANE_LOCK() pthread_mutex_lock(&zdplane_info.dg_mutex) - #define DPLANE_UNLOCK() pthread_mutex_unlock(&zdplane_info.dg_mutex) + +/* + * Lock and unlock for individual providers + */ +#define DPLANE_PROV_LOCK(p) pthread_mutex_lock(&((p)->dp_mutex)) +#define DPLANE_PROV_UNLOCK(p) pthread_mutex_unlock(&((p)->dp_mutex)) + /* Prototypes */ -static int dplane_route_process(struct thread *event); +static int dplane_thread_loop(struct thread *event); +static void dplane_info_from_zns(struct zebra_dplane_info *ns_info, + struct zebra_ns *zns); /* * Public APIs */ +/* Obtain thread_master for dataplane thread */ +struct thread_master *dplane_get_thread_master(void) +{ + return zdplane_info.dg_master; +} + /* * Allocate a dataplane update context */ @@ -249,7 +310,7 @@ static void dplane_ctx_free(struct zebra_dplane_ctx **pctx) */ void dplane_ctx_fini(struct zebra_dplane_ctx **pctx) { - /* TODO -- enqueue for next provider; for now, just free */ + /* TODO -- maintain pool; for now, just free */ dplane_ctx_free(pctx); } @@ -260,15 +321,27 @@ void dplane_ctx_enqueue_tail(struct dplane_ctx_q *q, TAILQ_INSERT_TAIL(q, (struct zebra_dplane_ctx *)ctx, zd_q_entries); } +/* Append a list of context blocks to another list */ +void dplane_ctx_list_append(struct dplane_ctx_q *to_list, + struct dplane_ctx_q *from_list) +{ + if (TAILQ_FIRST(from_list)) { + TAILQ_CONCAT(to_list, from_list, zd_q_entries); + + /* And clear 'from' list */ + TAILQ_INIT(from_list); + } +} + /* Dequeue a context block from the head of a list */ -void dplane_ctx_dequeue(struct dplane_ctx_q *q, struct zebra_dplane_ctx **ctxp) +struct zebra_dplane_ctx *dplane_ctx_dequeue(struct dplane_ctx_q *q) { struct zebra_dplane_ctx *ctx = TAILQ_FIRST(q); if (ctx) TAILQ_REMOVE(q, ctx, zd_q_entries); - *ctxp = ctx; + return ctx; } /* @@ -282,6 +355,38 @@ enum zebra_dplane_result dplane_ctx_get_status( return ctx->zd_status; } +void dplane_ctx_set_status(struct zebra_dplane_ctx *ctx, + enum zebra_dplane_result status) +{ + DPLANE_CTX_VALID(ctx); + + ctx->zd_status = status; +} + +/* Retrieve last/current provider id */ +uint32_t dplane_ctx_get_provider(const struct zebra_dplane_ctx *ctx) +{ + DPLANE_CTX_VALID(ctx); + return ctx->zd_provider; +} + +/* Providers run before the kernel can control whether a kernel + * update should be done. + */ +void dplane_ctx_set_skip_kernel(struct zebra_dplane_ctx *ctx) +{ + DPLANE_CTX_VALID(ctx); + + SET_FLAG(ctx->zd_flags, DPLANE_CTX_FLAG_NO_KERNEL); +} + +bool dplane_ctx_is_skip_kernel(const struct zebra_dplane_ctx *ctx) +{ + DPLANE_CTX_VALID(ctx); + + return CHECK_FLAG(ctx->zd_flags, DPLANE_CTX_FLAG_NO_KERNEL); +} + enum dplane_op_e dplane_ctx_get_op(const struct zebra_dplane_ctx *ctx) { DPLANE_CTX_VALID(ctx); @@ -441,7 +546,7 @@ uint16_t dplane_ctx_get_old_instance(const struct zebra_dplane_ctx *ctx) { DPLANE_CTX_VALID(ctx); - return ctx->zd_instance; + return ctx->zd_old_instance; } uint32_t dplane_ctx_get_metric(const struct zebra_dplane_ctx *ctx) @@ -514,6 +619,7 @@ const struct zebra_dplane_info *dplane_ctx_get_ns( * End of dplane context accessors */ + /* * Retrieve the limit on the number of pending, unprocessed updates. */ @@ -565,6 +671,7 @@ static int dplane_ctx_route_init(struct zebra_dplane_ctx *ctx, goto done; ctx->zd_op = op; + ctx->zd_status = ZEBRA_DPLANE_REQUEST_SUCCESS; ctx->zd_type = re->type; ctx->zd_old_type = re->type; @@ -601,16 +708,17 @@ static int dplane_ctx_route_init(struct zebra_dplane_ctx *ctx, zvrf = vrf_info_lookup(re->vrf_id); zns = zvrf->zns; - zebra_dplane_info_from_zns(&(ctx->zd_ns_info), zns, true /*is_cmd*/); + /* Internal copy helper */ + dplane_info_from_zns(&(ctx->zd_ns_info), zns); #if defined(HAVE_NETLINK) /* Increment message counter after copying to context struct - may need * two messages in some 'update' cases. */ if (op == DPLANE_OP_ROUTE_UPDATE) - zns->netlink_cmd.seq += 2; + zns->netlink_dplane.seq += 2; else - zns->netlink_cmd.seq++; + zns->netlink_dplane.seq++; #endif /* NETLINK*/ /* Copy nexthops; recursive info is included too */ @@ -618,7 +726,7 @@ static int dplane_ctx_route_init(struct zebra_dplane_ctx *ctx, /* TODO -- maybe use array of nexthops to avoid allocs? */ - /* Ensure that the dplane's nexthop flag is clear. */ + /* Ensure that the dplane's nexthops flags are clear. */ for (ALL_NEXTHOPS(ctx->zd_ng, nexthop)) UNSET_FLAG(nexthop->flags, NEXTHOP_FLAG_FIB); @@ -675,35 +783,12 @@ static int dplane_route_enqueue(struct zebra_dplane_ctx *ctx) } /* Ensure that an event for the dataplane thread is active */ - thread_add_event(zdplane_info.dg_master, dplane_route_process, NULL, 0, - &zdplane_info.dg_t_update); - - ret = AOK; + ret = dplane_provider_work_ready(); return ret; } /* - * Attempt to dequeue a route-update block - */ -static struct zebra_dplane_ctx *dplane_route_dequeue(void) -{ - struct zebra_dplane_ctx *ctx = NULL; - - DPLANE_LOCK(); - { - ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q); - if (ctx) { - TAILQ_REMOVE(&zdplane_info.dg_route_ctx_q, - ctx, zd_q_entries); - } - } - DPLANE_UNLOCK(); - - return ctx; -} - -/* * Utility that prepares a route update and enqueues it for processing */ static enum zebra_dplane_result @@ -826,67 +911,14 @@ done: } /* - * Event handler function for routing updates - */ -static int dplane_route_process(struct thread *event) -{ - enum zebra_dplane_result res; - struct zebra_dplane_ctx *ctx; - - while (1) { - /* Check for shutdown */ - if (!zdplane_info.dg_run) - break; - - /* TODO -- limit number of updates per cycle? */ - ctx = dplane_route_dequeue(); - if (ctx == NULL) - break; - - /* Update counter */ - atomic_fetch_sub_explicit(&zdplane_info.dg_routes_queued, 1, - memory_order_relaxed); - - if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) { - char dest_str[PREFIX_STRLEN]; - - prefix2str(dplane_ctx_get_dest(ctx), - dest_str, sizeof(dest_str)); - - zlog_debug("%u:%s Dplane route update ctx %p op %s", - dplane_ctx_get_vrf(ctx), dest_str, - ctx, dplane_op2str(dplane_ctx_get_op(ctx))); - } - - /* TODO -- support series of providers */ - - /* Initially, just doing kernel-facing update here */ - res = kernel_route_update(ctx); - - if (res != ZEBRA_DPLANE_REQUEST_SUCCESS) - atomic_fetch_add_explicit(&zdplane_info.dg_route_errors, - 1, memory_order_relaxed); - - ctx->zd_status = res; - - /* Enqueue result to zebra main context */ - zdplane_info.dg_results_cb(ctx); - - ctx = NULL; - } - - return 0; -} - -/* * Handler for 'show dplane' */ int dplane_show_helper(struct vty *vty, bool detailed) { - uint64_t queued, limit, queue_max, errs, incoming; + uint64_t queued, queue_max, limit, errs, incoming, yields; /* Using atomics because counters are being changed in different - * contexts. + * pthread contexts. */ incoming = atomic_load_explicit(&zdplane_info.dg_routes_in, memory_order_relaxed); @@ -898,12 +930,16 @@ int dplane_show_helper(struct vty *vty, bool detailed) memory_order_relaxed); errs = atomic_load_explicit(&zdplane_info.dg_route_errors, memory_order_relaxed); + yields = atomic_load_explicit(&zdplane_info.dg_update_yields, + memory_order_relaxed); - vty_out(vty, "Route updates: %"PRIu64"\n", incoming); + vty_out(vty, "Zebra dataplane:\nRoute updates: %"PRIu64"\n", + incoming); vty_out(vty, "Route update errors: %"PRIu64"\n", errs); vty_out(vty, "Route update queue limit: %"PRIu64"\n", limit); vty_out(vty, "Route update queue depth: %"PRIu64"\n", queued); vty_out(vty, "Route update queue max: %"PRIu64"\n", queue_max); + vty_out(vty, "Route update yields: %"PRIu64"\n", yields); return CMD_SUCCESS; } @@ -913,8 +949,35 @@ int dplane_show_helper(struct vty *vty, bool detailed) */ int dplane_show_provs_helper(struct vty *vty, bool detailed) { - vty_out(vty, "Zebra dataplane providers:%s\n", - (detailed ? " (detailed)" : "")); + struct zebra_dplane_provider *prov; + uint64_t in, in_max, out, out_max; + + vty_out(vty, "Zebra dataplane providers:\n"); + + DPLANE_LOCK(); + prov = TAILQ_FIRST(&zdplane_info.dg_providers_q); + DPLANE_UNLOCK(); + + /* Show counters, useful info from each registered provider */ + while (prov) { + + in = atomic_load_explicit(&prov->dp_in_counter, + memory_order_relaxed); + in_max = atomic_load_explicit(&prov->dp_in_max, + memory_order_relaxed); + out = atomic_load_explicit(&prov->dp_out_counter, + memory_order_relaxed); + out_max = atomic_load_explicit(&prov->dp_out_max, + memory_order_relaxed); + + vty_out(vty, "%s (%u): in: %"PRIu64", q_max: %"PRIu64", " + "out: %"PRIu64", q_max: %"PRIu64"\n", + prov->dp_name, prov->dp_id, in, in_max, out, out_max); + + DPLANE_LOCK(); + prov = TAILQ_NEXT(prov, dp_prov_link); + DPLANE_UNLOCK(); + } return CMD_SUCCESS; } @@ -923,9 +986,11 @@ int dplane_show_provs_helper(struct vty *vty, bool detailed) * Provider registration */ int dplane_provider_register(const char *name, - enum dplane_provider_prio_e prio, + enum dplane_provider_prio prio, + int flags, dplane_provider_process_fp fp, - dplane_provider_fini_fp fini_fp) + dplane_provider_fini_fp fini_fp, + void *data) { int ret = 0; struct zebra_dplane_provider *p, *last; @@ -949,37 +1014,201 @@ int dplane_provider_register(const char *name, goto done; } - strncpy(p->dp_name, name, DPLANE_PROVIDER_NAMELEN); - p->dp_name[DPLANE_PROVIDER_NAMELEN] = '\0'; /* Belt-and-suspenders */ + pthread_mutex_init(&(p->dp_mutex), NULL); + TAILQ_INIT(&(p->dp_ctx_in_q)); + TAILQ_INIT(&(p->dp_ctx_out_q)); p->dp_priority = prio; p->dp_fp = fp; p->dp_fini = fini_fp; + p->dp_data = data; - /* Lock the lock - the dplane pthread may be running */ + /* Lock - the dplane pthread may be running */ DPLANE_LOCK(); p->dp_id = ++zdplane_info.dg_provider_id; + if (name) + strlcpy(p->dp_name, name, DPLANE_PROVIDER_NAMELEN); + else + snprintf(p->dp_name, DPLANE_PROVIDER_NAMELEN, + "provider-%u", p->dp_id); + /* Insert into list ordered by priority */ - TAILQ_FOREACH(last, &zdplane_info.dg_providers_q, dp_q_providers) { + TAILQ_FOREACH(last, &zdplane_info.dg_providers_q, dp_prov_link) { if (last->dp_priority > p->dp_priority) break; } if (last) - TAILQ_INSERT_BEFORE(last, p, dp_q_providers); + TAILQ_INSERT_BEFORE(last, p, dp_prov_link); else TAILQ_INSERT_TAIL(&zdplane_info.dg_providers_q, p, - dp_q_providers); + dp_prov_link); /* And unlock */ DPLANE_UNLOCK(); + if (IS_ZEBRA_DEBUG_DPLANE) + zlog_debug("dplane: registered new provider '%s' (%u), prio %d", + p->dp_name, p->dp_id, p->dp_priority); + done: return ret; } +/* Accessors for provider attributes */ +const char *dplane_provider_get_name(const struct zebra_dplane_provider *prov) +{ + return prov->dp_name; +} + +uint32_t dplane_provider_get_id(const struct zebra_dplane_provider *prov) +{ + return prov->dp_id; +} + +void *dplane_provider_get_data(const struct zebra_dplane_provider *prov) +{ + return prov->dp_data; +} + +int dplane_provider_get_work_limit(const struct zebra_dplane_provider *prov) +{ + return zdplane_info.dg_updates_per_cycle; +} + +/* Lock/unlock a provider's mutex - iff the provider was registered with + * the THREADED flag. + */ +void dplane_provider_lock(struct zebra_dplane_provider *prov) +{ + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_LOCK(prov); +} + +void dplane_provider_unlock(struct zebra_dplane_provider *prov) +{ + if (dplane_provider_is_threaded(prov)) + DPLANE_PROV_UNLOCK(prov); +} + +/* + * Dequeue and maintain associated counter + */ +struct zebra_dplane_ctx *dplane_provider_dequeue_in_ctx( + struct zebra_dplane_provider *prov) +{ + struct zebra_dplane_ctx *ctx = NULL; + + dplane_provider_lock(prov); + + ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q)); + if (ctx) { + TAILQ_REMOVE(&(prov->dp_ctx_in_q), ctx, zd_q_entries); + + atomic_fetch_sub_explicit(&prov->dp_in_queued, 1, + memory_order_relaxed); + } + + dplane_provider_unlock(prov); + + return ctx; +} + +/* + * Dequeue work to a list, return count + */ +int dplane_provider_dequeue_in_list(struct zebra_dplane_provider *prov, + struct dplane_ctx_q *listp) +{ + int limit, ret; + struct zebra_dplane_ctx *ctx; + + limit = zdplane_info.dg_updates_per_cycle; + + dplane_provider_lock(prov); + + for (ret = 0; ret < limit; ret++) { + ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q)); + if (ctx) { + TAILQ_REMOVE(&(prov->dp_ctx_in_q), ctx, zd_q_entries); + + TAILQ_INSERT_TAIL(listp, ctx, zd_q_entries); + } else { + break; + } + } + + if (ret > 0) + atomic_fetch_sub_explicit(&prov->dp_in_queued, ret, + memory_order_relaxed); + + dplane_provider_unlock(prov); + + return ret; +} + +/* + * Enqueue and maintain associated counter + */ +void dplane_provider_enqueue_out_ctx(struct zebra_dplane_provider *prov, + struct zebra_dplane_ctx *ctx) +{ + dplane_provider_lock(prov); + + TAILQ_INSERT_TAIL(&(prov->dp_ctx_out_q), ctx, + zd_q_entries); + + dplane_provider_unlock(prov); + + atomic_fetch_add_explicit(&(prov->dp_out_counter), 1, + memory_order_relaxed); +} + +/* + * Accessor for provider object + */ +bool dplane_provider_is_threaded(const struct zebra_dplane_provider *prov) +{ + return (prov->dp_flags & DPLANE_PROV_FLAG_THREADED); +} + +/* + * Internal helper that copies information from a zebra ns object; this is + * called in the zebra main pthread context as part of dplane ctx init. + */ +static void dplane_info_from_zns(struct zebra_dplane_info *ns_info, + struct zebra_ns *zns) +{ + ns_info->ns_id = zns->ns_id; + +#if defined(HAVE_NETLINK) + ns_info->is_cmd = true; + ns_info->nls = zns->netlink_dplane; +#endif /* NETLINK */ +} + +/* + * Provider api to signal that work/events are available + * for the dataplane pthread. + */ +int dplane_provider_work_ready(void) +{ + /* Note that during zebra startup, we may be offered work before + * the dataplane pthread (and thread-master) are ready. We want to + * enqueue the work, but the event-scheduling machinery may not be + * available. + */ + if (zdplane_info.dg_run) { + thread_add_event(zdplane_info.dg_master, + dplane_thread_loop, NULL, 0, + &zdplane_info.dg_t_update); + } + + return AOK; +} + /* * Zebra registers a results callback with the dataplane system */ @@ -990,27 +1219,163 @@ int dplane_results_register(dplane_results_fp fp) } /* - * Initialize the dataplane module during startup, internal/private version + * Kernel dataplane provider */ -static void zebra_dplane_init_internal(struct zebra_t *zebra) + +/* + * Kernel provider callback + */ +static int kernel_dplane_process_func(struct zebra_dplane_provider *prov) { - memset(&zdplane_info, 0, sizeof(zdplane_info)); + enum zebra_dplane_result res; + struct zebra_dplane_ctx *ctx; + int counter, limit; - pthread_mutex_init(&zdplane_info.dg_mutex, NULL); + limit = dplane_provider_get_work_limit(prov); - TAILQ_INIT(&zdplane_info.dg_route_ctx_q); - TAILQ_INIT(&zdplane_info.dg_providers_q); + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s': processing", + dplane_provider_get_name(prov)); - zdplane_info.dg_max_queued_updates = DPLANE_DEFAULT_MAX_QUEUED; + for (counter = 0; counter < limit; counter++) { + + ctx = dplane_provider_dequeue_in_ctx(prov); + if (ctx == NULL) + break; - /* TODO -- register default kernel 'provider' during init */ + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) { + char dest_str[PREFIX_STRLEN]; - zdplane_info.dg_run = true; + prefix2str(dplane_ctx_get_dest(ctx), + dest_str, sizeof(dest_str)); + + zlog_debug("%u:%s Dplane route update ctx %p op %s", + dplane_ctx_get_vrf(ctx), dest_str, + ctx, dplane_op2str(dplane_ctx_get_op(ctx))); + } + + /* Call into the synchronous kernel-facing code here */ + res = kernel_route_update(ctx); + + if (res != ZEBRA_DPLANE_REQUEST_SUCCESS) + atomic_fetch_add_explicit( + &zdplane_info.dg_route_errors, 1, + memory_order_relaxed); + + dplane_ctx_set_status(ctx, res); + + dplane_provider_enqueue_out_ctx(prov, ctx); + } + + /* Ensure that we'll run the work loop again if there's still + * more work to do. + */ + if (counter >= limit) { + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s' reached max updates %d", + dplane_provider_get_name(prov), counter); + + atomic_fetch_add_explicit(&zdplane_info.dg_update_yields, + 1, memory_order_relaxed); + + dplane_provider_work_ready(); + } + + return 0; +} + +#if DPLANE_TEST_PROVIDER + +/* + * Test dataplane provider plugin + */ + +/* + * Test provider process callback + */ +static int test_dplane_process_func(struct zebra_dplane_provider *prov) +{ + struct zebra_dplane_ctx *ctx; + int counter, limit; + + /* Just moving from 'in' queue to 'out' queue */ + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s': processing", + dplane_provider_get_name(prov)); + + limit = dplane_provider_get_work_limit(prov); + + for (counter = 0; counter < limit; counter++) { + + ctx = dplane_provider_dequeue_in_ctx(prov); + if (ctx == NULL) + break; - /* TODO -- start dataplane pthread. We're using the zebra - * core/main thread temporarily + dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS); + + dplane_provider_enqueue_out_ctx(prov, ctx); + } + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane provider '%s': processed %d", + dplane_provider_get_name(prov), counter); + + /* Ensure that we'll run the work loop again if there's still + * more work to do. */ - zdplane_info.dg_master = zebra->master; + if (counter >= limit) + dplane_provider_work_ready(); + + return 0; +} + +/* + * Test provider shutdown/fini callback + */ +static int test_dplane_shutdown_func(struct zebra_dplane_provider *prov, + bool early) +{ + if (IS_ZEBRA_DEBUG_DPLANE) + zlog_debug("dplane provider '%s': %sshutdown", + dplane_provider_get_name(prov), + early ? "early " : ""); + + return 0; +} +#endif /* DPLANE_TEST_PROVIDER */ + +/* + * Register default kernel provider + */ +static void dplane_provider_init(void) +{ + int ret; + + ret = dplane_provider_register("Kernel", + DPLANE_PRIO_KERNEL, + DPLANE_PROV_FLAGS_DEFAULT, + kernel_dplane_process_func, + NULL, + NULL); + + if (ret != AOK) + zlog_err("Unable to register kernel dplane provider: %d", + ret); + +#if DPLANE_TEST_PROVIDER + /* Optional test provider ... */ + ret = dplane_provider_register("Test", + DPLANE_PRIO_PRE_KERNEL, + DPLANE_PROV_FLAGS_DEFAULT, + test_dplane_process_func, + test_dplane_shutdown_func, + NULL /* data */); + + if (ret != AOK) + zlog_err("Unable to register test dplane provider: %d", + ret); +#endif /* DPLANE_TEST_PROVIDER */ } /* Indicates zebra shutdown/exit is in progress. Some operations may be @@ -1026,7 +1391,7 @@ bool dplane_is_in_shutdown(void) * early during zebra shutdown, as a signal to stop new work and prepare * for updates generated by shutdown/cleanup activity, as zebra tries to * remove everything it's responsible for. - * NB: This runs in the main zebra thread context. + * NB: This runs in the main zebra pthread context. */ void zebra_dplane_pre_finish(void) { @@ -1035,7 +1400,7 @@ void zebra_dplane_pre_finish(void) zdplane_info.dg_is_shutdown = true; - /* Notify provider(s) of pending shutdown */ + /* TODO -- Notify provider(s) of pending shutdown */ } /* @@ -1044,16 +1409,48 @@ void zebra_dplane_pre_finish(void) */ static bool dplane_work_pending(void) { + bool ret = false; struct zebra_dplane_ctx *ctx; + struct zebra_dplane_provider *prov; - /* TODO -- just checking incoming/pending work for now */ + /* TODO -- just checking incoming/pending work for now, must check + * providers + */ DPLANE_LOCK(); { ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q); + prov = TAILQ_FIRST(&zdplane_info.dg_providers_q); } DPLANE_UNLOCK(); - return (ctx != NULL); + if (ctx != NULL) { + ret = true; + goto done; + } + + while (prov) { + + dplane_provider_lock(prov); + + ctx = TAILQ_FIRST(&(prov->dp_ctx_in_q)); + if (ctx == NULL) + ctx = TAILQ_FIRST(&(prov->dp_ctx_out_q)); + + dplane_provider_unlock(prov); + + if (ctx != NULL) + break; + + DPLANE_LOCK(); + prov = TAILQ_NEXT(prov, dp_prov_link); + DPLANE_UNLOCK(); + } + + if (ctx != NULL) + ret = true; + +done: + return ret; } /* @@ -1108,6 +1505,205 @@ void zebra_dplane_finish(void) } /* + * Main dataplane pthread event loop. The thread takes new incoming work + * and offers it to the first provider. It then iterates through the + * providers, taking complete work from each one and offering it + * to the next in order. At each step, a limited number of updates are + * processed during a cycle in order to provide some fairness. + * + * This loop through the providers is only run once, so that the dataplane + * pthread can look for other pending work - such as i/o work on behalf of + * providers. + */ +static int dplane_thread_loop(struct thread *event) +{ + struct dplane_ctx_q work_list; + struct dplane_ctx_q error_list; + struct zebra_dplane_provider *prov; + struct zebra_dplane_ctx *ctx, *tctx; + int limit, counter, error_counter; + uint64_t curr, high; + + /* Capture work limit per cycle */ + limit = zdplane_info.dg_updates_per_cycle; + + /* Init temporary lists used to move contexts among providers */ + TAILQ_INIT(&work_list); + TAILQ_INIT(&error_list); + error_counter = 0; + + /* Check for zebra shutdown */ + if (!zdplane_info.dg_run) + goto done; + + /* Dequeue some incoming work from zebra (if any) onto the temporary + * working list. + */ + DPLANE_LOCK(); + + /* Locate initial registered provider */ + prov = TAILQ_FIRST(&zdplane_info.dg_providers_q); + + /* Move new work from incoming list to temp list */ + for (counter = 0; counter < limit; counter++) { + ctx = TAILQ_FIRST(&zdplane_info.dg_route_ctx_q); + if (ctx) { + TAILQ_REMOVE(&zdplane_info.dg_route_ctx_q, ctx, + zd_q_entries); + + ctx->zd_provider = prov->dp_id; + + TAILQ_INSERT_TAIL(&work_list, ctx, zd_q_entries); + } else { + break; + } + } + + DPLANE_UNLOCK(); + + atomic_fetch_sub_explicit(&zdplane_info.dg_routes_queued, counter, + memory_order_relaxed); + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane: incoming new work counter: %d", counter); + + /* Iterate through the registered providers, offering new incoming + * work. If the provider has outgoing work in its queue, take that + * work for the next provider + */ + while (prov) { + + /* At each iteration, the temporary work list has 'counter' + * items. + */ + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane enqueues %d new work to provider '%s'", + counter, dplane_provider_get_name(prov)); + + /* Capture current provider id in each context; check for + * error status. + */ + TAILQ_FOREACH_SAFE(ctx, &work_list, zd_q_entries, tctx) { + if (dplane_ctx_get_status(ctx) == + ZEBRA_DPLANE_REQUEST_SUCCESS) { + ctx->zd_provider = prov->dp_id; + } else { + /* + * TODO -- improve error-handling: recirc + * errors backwards so that providers can + * 'undo' their work (if they want to) + */ + + /* Move to error list; will be returned + * zebra main. + */ + TAILQ_REMOVE(&work_list, ctx, zd_q_entries); + TAILQ_INSERT_TAIL(&error_list, + ctx, zd_q_entries); + error_counter++; + } + } + + /* Enqueue new work to the provider */ + dplane_provider_lock(prov); + + if (TAILQ_FIRST(&work_list)) + TAILQ_CONCAT(&(prov->dp_ctx_in_q), &work_list, + zd_q_entries); + + atomic_fetch_add_explicit(&prov->dp_in_counter, counter, + memory_order_relaxed); + atomic_fetch_add_explicit(&prov->dp_in_queued, counter, + memory_order_relaxed); + curr = atomic_load_explicit(&prov->dp_in_queued, + memory_order_relaxed); + high = atomic_load_explicit(&prov->dp_in_max, + memory_order_relaxed); + if (curr > high) + atomic_store_explicit(&prov->dp_in_max, curr, + memory_order_relaxed); + + dplane_provider_unlock(prov); + + /* Reset the temp list (though the 'concat' may have done this + * already), and the counter + */ + TAILQ_INIT(&work_list); + counter = 0; + + /* Call into the provider code. Note that this is + * unconditional: we offer to do work even if we don't enqueue + * any _new_ work. + */ + (*prov->dp_fp)(prov); + + /* Check for zebra shutdown */ + if (!zdplane_info.dg_run) + break; + + /* Dequeue completed work from the provider */ + dplane_provider_lock(prov); + + while (counter < limit) { + ctx = TAILQ_FIRST(&(prov->dp_ctx_out_q)); + if (ctx) { + TAILQ_REMOVE(&(prov->dp_ctx_out_q), ctx, + zd_q_entries); + + TAILQ_INSERT_TAIL(&work_list, + ctx, zd_q_entries); + counter++; + } else + break; + } + + dplane_provider_unlock(prov); + + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane dequeues %d completed work from provider %s", + counter, dplane_provider_get_name(prov)); + + /* Locate next provider */ + DPLANE_LOCK(); + prov = TAILQ_NEXT(prov, dp_prov_link); + DPLANE_UNLOCK(); + } + + /* After all providers have been serviced, enqueue any completed + * work and any errors back to zebra so it can process the results. + */ + if (IS_ZEBRA_DEBUG_DPLANE_DETAIL) + zlog_debug("dplane has %d completed, %d errors, for zebra main", + counter, error_counter); + + /* + * TODO -- I'd rather hand lists through the api to zebra main, + * to reduce the number of lock/unlock cycles + */ + for (ctx = TAILQ_FIRST(&error_list); ctx; ) { + TAILQ_REMOVE(&error_list, ctx, zd_q_entries); + + /* Call through to zebra main */ + (*zdplane_info.dg_results_cb)(ctx); + + ctx = TAILQ_FIRST(&error_list); + } + + + for (ctx = TAILQ_FIRST(&work_list); ctx; ) { + TAILQ_REMOVE(&work_list, ctx, zd_q_entries); + + /* Call through to zebra main */ + (*zdplane_info.dg_results_cb)(ctx); + + ctx = TAILQ_FIRST(&work_list); + } + +done: + return 0; +} + +/* * Final phase of shutdown, after all work enqueued to dplane has been * processed. This is called from the zebra main pthread context. */ @@ -1122,14 +1718,65 @@ void zebra_dplane_shutdown(void) THREAD_OFF(zdplane_info.dg_t_update); - /* TODO */ - /* frr_pthread_stop(...) */ + frr_pthread_stop(zdplane_info.dg_pthread, NULL); + + /* Destroy pthread */ + frr_pthread_destroy(zdplane_info.dg_pthread); + zdplane_info.dg_pthread = NULL; + zdplane_info.dg_master = NULL; + + /* TODO -- Notify provider(s) of final shutdown */ + + /* TODO -- Clean-up provider objects */ + + /* TODO -- Clean queue(s), free memory */ +} + +/* + * Initialize the dataplane module during startup, internal/private version + */ +static void zebra_dplane_init_internal(struct zebra_t *zebra) +{ + memset(&zdplane_info, 0, sizeof(zdplane_info)); + + pthread_mutex_init(&zdplane_info.dg_mutex, NULL); + + TAILQ_INIT(&zdplane_info.dg_route_ctx_q); + TAILQ_INIT(&zdplane_info.dg_providers_q); + + zdplane_info.dg_updates_per_cycle = DPLANE_DEFAULT_NEW_WORK; + + zdplane_info.dg_max_queued_updates = DPLANE_DEFAULT_MAX_QUEUED; + + /* Register default kernel 'provider' during init */ + dplane_provider_init(); +} + +/* + * Start the dataplane pthread. This step needs to be run later than the + * 'init' step, in case zebra has fork-ed. + */ +void zebra_dplane_start(void) +{ + /* Start dataplane pthread */ + + struct frr_pthread_attr pattr = { + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop + }; + + zdplane_info.dg_pthread = frr_pthread_new(&pattr, "Zebra dplane thread", + "Zebra dplane"); - /* Notify provider(s) of final shutdown */ + zdplane_info.dg_master = zdplane_info.dg_pthread->master; - /* Clean-up provider objects */ + zdplane_info.dg_run = true; + + /* Enqueue an initial event for the dataplane pthread */ + thread_add_event(zdplane_info.dg_master, dplane_thread_loop, NULL, 0, + &zdplane_info.dg_t_update); - /* Clean queue(s) */ + frr_pthread_run(zdplane_info.dg_pthread, NULL); } /* |
