From f104f6c1a62c92a4420e1d99abb336d566c1ff51 Mon Sep 17 00:00:00 2001 From: Jorge Boncompte Date: Tue, 8 Aug 2017 20:32:30 +0200 Subject: [PATCH] lib: cleanup the work queue implementation Convert the work queue implementation to not use the generic linked list to mantain the item list and use instead a simple queue from queue.h that does not allocate memory for each node. Signed-off-by: Jorge Boncompte --- lib/freebsd-queue.h | 2 +- lib/workqueue.c | 54 +++++++++++++++++++-------------------------- lib/workqueue.h | 36 +++++++++++++++++++++++++++++- zebra/zebra_rib.c | 2 +- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/lib/freebsd-queue.h b/lib/freebsd-queue.h index 658b602ba3..d198f5674f 100644 --- a/lib/freebsd-queue.h +++ b/lib/freebsd-queue.h @@ -302,7 +302,7 @@ struct qm_trace { (STAILQ_EMPTY((head)) \ ? NULL \ : ((struct type *)(void *)((char *)((head)->stqh_last) \ - - __offsetof(struct type, \ + - offsetof(struct type, \ field)))) #define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next) diff --git a/lib/workqueue.c b/lib/workqueue.c index 612421c80b..b76b73b367 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -72,14 +72,7 @@ struct work_queue *work_queue_new(struct thread_master *m, new->master = m; SET_FLAG(new->flags, WQ_UNPLUGGED); - if ((new->items = list_new()) == NULL) { - XFREE(MTYPE_WORK_QUEUE_NAME, new->name); - XFREE(MTYPE_WORK_QUEUE, new); - - return NULL; - } - - new->items->del = (void (*)(void *))work_queue_item_free; + STAILQ_INIT(&new->items); listnode_add(work_queues, new); @@ -97,8 +90,6 @@ void work_queue_free(struct work_queue *wq) if (wq->thread != NULL) thread_cancel(wq->thread); - /* list_delete frees items via callback */ - list_delete(wq->items); listnode_delete(work_queues, wq); XFREE(MTYPE_WORK_QUEUE_NAME, wq->name); @@ -114,8 +105,8 @@ bool work_queue_is_scheduled(struct work_queue *wq) static int work_queue_schedule(struct work_queue *wq, unsigned int delay) { /* if appropriate, schedule work queue thread */ - if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) - && (listcount(wq->items) > 0)) { + if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) && + !work_queue_empty(wq)) { wq->thread = NULL; thread_add_timer_msec(wq->master, work_queue_run, wq, delay, &wq->thread); @@ -139,33 +130,35 @@ void work_queue_add(struct work_queue *wq, void *data) } item->data = data; - listnode_add(wq->items, item); + work_queue_item_enqueue(wq, item); work_queue_schedule(wq, wq->spec.hold); return; } -static void work_queue_item_remove(struct work_queue *wq, struct listnode *ln) +static void work_queue_item_remove(struct work_queue *wq, + struct work_queue_item *item) { - struct work_queue_item *item = listgetdata(ln); - assert(item && item->data); /* call private data deletion callback if needed */ if (wq->spec.del_item_data) wq->spec.del_item_data(wq, item->data); - list_delete_node(wq->items, ln); + work_queue_item_dequeue(wq, item); + work_queue_item_free(item); return; } -static void work_queue_item_requeue(struct work_queue *wq, struct listnode *ln) +static void work_queue_item_requeue(struct work_queue *wq, struct work_queue_item *item) { - LISTNODE_DETACH(wq->items, ln); - LISTNODE_ATTACH(wq->items, ln); /* attach to end of list */ + work_queue_item_dequeue(wq, item); + + /* attach to end of list */ + work_queue_item_enqueue(wq, item); } DEFUN (show_work_queues, @@ -186,7 +179,7 @@ DEFUN (show_work_queues, for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) { vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), - listcount(wq->items), wq->spec.hold, wq->runs, + work_queue_item_count(wq), wq->spec.hold, wq->runs, wq->yields, wq->cycles.best, wq->cycles.granularity, wq->cycles.total, (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs) @@ -233,16 +226,15 @@ void work_queue_unplug(struct work_queue *wq) int work_queue_run(struct thread *thread) { struct work_queue *wq; - struct work_queue_item *item; + struct work_queue_item *item, *titem; wq_item_status ret; unsigned int cycles = 0; - struct listnode *node, *nnode; char yielded = 0; wq = THREAD_ARG(thread); wq->thread = NULL; - assert(wq && wq->items); + assert(wq); /* calculate cycle granularity: * list iteration == 1 run @@ -266,7 +258,7 @@ int work_queue_run(struct thread *thread) if (wq->cycles.granularity == 0) wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; - for (ALL_LIST_ELEMENTS(wq->items, node, nnode, item)) { + STAILQ_FOREACH_SAFE(item, &wq->items, wq, titem) { assert(item && item->data); /* dont run items which are past their allowed retries */ @@ -274,7 +266,7 @@ int work_queue_run(struct thread *thread) /* run error handler, if any */ if (wq->spec.errorfunc) wq->spec.errorfunc(wq, item->data); - work_queue_item_remove(wq, node); + work_queue_item_remove(wq, item); continue; } @@ -298,7 +290,7 @@ int work_queue_run(struct thread *thread) } case WQ_REQUEUE: { item->ran--; - work_queue_item_requeue(wq, node); + work_queue_item_requeue(wq, item); /* If a single node is being used with a meta-queue * (e.g., zebra), * update the next node as we don't want to exit the @@ -309,8 +301,8 @@ int work_queue_run(struct thread *thread) * will kick in * to terminate the thread when time has exceeded. */ - if (nnode == NULL) - nnode = node; + if (titem == NULL) + titem = item; break; } case WQ_RETRY_NOW: @@ -323,7 +315,7 @@ int work_queue_run(struct thread *thread) /* fallthru */ case WQ_SUCCESS: default: { - work_queue_item_remove(wq, node); + work_queue_item_remove(wq, item); break; } } @@ -376,7 +368,7 @@ stats: #endif /* Is the queue done yet? If it is, call the completion callback. */ - if (listcount(wq->items) > 0) + if (!work_queue_empty(wq)) work_queue_schedule(wq, 0); else if (wq->spec.completion_func) wq->spec.completion_func(wq); diff --git a/lib/workqueue.h b/lib/workqueue.h index ff7f57690d..df35d44fbc 100644 --- a/lib/workqueue.h +++ b/lib/workqueue.h @@ -24,6 +24,7 @@ #define _QUAGGA_WORK_QUEUE_H #include "memory.h" +#include "queue.h" DECLARE_MTYPE(WORK_QUEUE) /* Hold time for the initial schedule of a queue run, in millisec */ @@ -43,6 +44,7 @@ typedef enum { /* A single work queue item, unsurprisingly */ struct work_queue_item { + STAILQ_ENTRY(work_queue_item) wq; void *data; /* opaque data */ unsigned short ran; /* # of times item has been run */ }; @@ -91,7 +93,8 @@ struct work_queue { } spec; /* remaining fields should be opaque to users */ - struct list *items; /* queue item list */ + STAILQ_HEAD(work_queue_items, work_queue_item) items; /* queue item list */ + int item_count; /* queued items */ unsigned long runs; /* runs count */ unsigned long yields; /* yields count */ @@ -107,6 +110,37 @@ struct work_queue { /* User API */ +static inline int work_queue_item_count(struct work_queue *wq) +{ + return wq->item_count; +} + +static inline bool work_queue_empty(struct work_queue *wq) +{ + return (wq->item_count == 0) ? true : false; +} + +static inline struct work_queue_item *work_queue_last_item(struct work_queue *wq) +{ + return STAILQ_LAST(&wq->items, work_queue_item, wq); +} + +static inline void work_queue_item_enqueue(struct work_queue *wq, + struct work_queue_item *item) +{ + STAILQ_INSERT_TAIL(&wq->items, item, wq); + wq->item_count++; +} + +static inline void work_queue_item_dequeue(struct work_queue *wq, + struct work_queue_item *item) +{ + assert(wq->item_count > 0); + + wq->item_count--; + STAILQ_REMOVE(&wq->items, item, work_queue_item, wq); +} + /* create a new work queue, of given name. * user must fill in the spec of the returned work queue before adding * anything to it diff --git a/zebra/zebra_rib.c b/zebra/zebra_rib.c index ed53554265..b27201616d 100644 --- a/zebra/zebra_rib.c +++ b/zebra/zebra_rib.c @@ -1820,7 +1820,7 @@ void rib_queue_add(struct route_node *rn) * holder, if necessary, then push the work into it in any case. * This semantics was introduced after 0.99.9 release. */ - if (!zebrad.ribq->items->count) + if (work_queue_empty(zebrad.ribq)) work_queue_add(zebrad.ribq, zebrad.mq); rib_meta_queue_add(zebrad.mq, rn); -- 2.39.5