]> git.puffer.fish Git - matthieu/frr.git/commitdiff
lib: cleanup the work queue implementation
authorJorge Boncompte <jbonor@gmail.com>
Tue, 8 Aug 2017 18:32:30 +0000 (20:32 +0200)
committerJorge Boncompte <jbonor@gmail.com>
Thu, 17 Aug 2017 15:47:07 +0000 (17:47 +0200)
Convert the work queue implementation to not use the generic linked list
to mantain the item list and use instead a simple queue from queue.h that
does not allocate memory for each node.

Signed-off-by: Jorge Boncompte <jbonor@gmail.com>
lib/freebsd-queue.h
lib/workqueue.c
lib/workqueue.h
zebra/zebra_rib.c

index 658b602ba3f740d4a2672f6c633dc0e36ac9e746..d198f5674f33b0a036118f1d83d5edc7a10e4047 100644 (file)
@@ -302,7 +302,7 @@ struct qm_trace {
        (STAILQ_EMPTY((head))                                                  \
                 ? NULL                                                        \
                 : ((struct type *)(void *)((char *)((head)->stqh_last)        \
-                                           - __offsetof(struct type,          \
+                                           - offsetof(struct type,            \
                                                         field))))
 
 #define        STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
index 612421c80b5fd723696fcdbf5d67d0f9956222f0..b76b73b367b5484379f0be7fa4b60fa53f576b01 100644 (file)
@@ -72,14 +72,7 @@ struct work_queue *work_queue_new(struct thread_master *m,
        new->master = m;
        SET_FLAG(new->flags, WQ_UNPLUGGED);
 
-       if ((new->items = list_new()) == NULL) {
-               XFREE(MTYPE_WORK_QUEUE_NAME, new->name);
-               XFREE(MTYPE_WORK_QUEUE, new);
-
-               return NULL;
-       }
-
-       new->items->del = (void (*)(void *))work_queue_item_free;
+       STAILQ_INIT(&new->items);
 
        listnode_add(work_queues, new);
 
@@ -97,8 +90,6 @@ void work_queue_free(struct work_queue *wq)
        if (wq->thread != NULL)
                thread_cancel(wq->thread);
 
-       /* list_delete frees items via callback */
-       list_delete(wq->items);
        listnode_delete(work_queues, wq);
 
        XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
@@ -114,8 +105,8 @@ bool work_queue_is_scheduled(struct work_queue *wq)
 static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
 {
        /* if appropriate, schedule work queue thread */
-       if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL)
-           && (listcount(wq->items) > 0)) {
+       if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) &&
+           !work_queue_empty(wq)) {
                wq->thread = NULL;
                thread_add_timer_msec(wq->master, work_queue_run, wq, delay,
                                      &wq->thread);
@@ -139,33 +130,35 @@ void work_queue_add(struct work_queue *wq, void *data)
        }
 
        item->data = data;
-       listnode_add(wq->items, item);
+       work_queue_item_enqueue(wq, item);
 
        work_queue_schedule(wq, wq->spec.hold);
 
        return;
 }
 
-static void work_queue_item_remove(struct work_queue *wq, struct listnode *ln)
+static void work_queue_item_remove(struct work_queue *wq,
+                                  struct work_queue_item *item)
 {
-       struct work_queue_item *item = listgetdata(ln);
-
        assert(item && item->data);
 
        /* call private data deletion callback if needed */
        if (wq->spec.del_item_data)
                wq->spec.del_item_data(wq, item->data);
 
-       list_delete_node(wq->items, ln);
+       work_queue_item_dequeue(wq, item);
+
        work_queue_item_free(item);
 
        return;
 }
 
-static void work_queue_item_requeue(struct work_queue *wq, struct listnode *ln)
+static void work_queue_item_requeue(struct work_queue *wq, struct work_queue_item *item)
 {
-       LISTNODE_DETACH(wq->items, ln);
-       LISTNODE_ATTACH(wq->items, ln); /* attach to end of list */
+       work_queue_item_dequeue(wq, item);
+
+       /* attach to end of list */
+       work_queue_item_enqueue(wq, item);
 }
 
 DEFUN (show_work_queues,
@@ -186,7 +179,7 @@ DEFUN (show_work_queues,
        for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) {
                vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n",
                        (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
-                       listcount(wq->items), wq->spec.hold, wq->runs,
+                       work_queue_item_count(wq), wq->spec.hold, wq->runs,
                        wq->yields, wq->cycles.best, wq->cycles.granularity,
                        wq->cycles.total,
                        (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs)
@@ -233,16 +226,15 @@ void work_queue_unplug(struct work_queue *wq)
 int work_queue_run(struct thread *thread)
 {
        struct work_queue *wq;
-       struct work_queue_item *item;
+       struct work_queue_item *item, *titem;
        wq_item_status ret;
        unsigned int cycles = 0;
-       struct listnode *node, *nnode;
        char yielded = 0;
 
        wq = THREAD_ARG(thread);
        wq->thread = NULL;
 
-       assert(wq && wq->items);
+       assert(wq);
 
        /* calculate cycle granularity:
         * list iteration == 1 run
@@ -266,7 +258,7 @@ int work_queue_run(struct thread *thread)
        if (wq->cycles.granularity == 0)
                wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
 
-       for (ALL_LIST_ELEMENTS(wq->items, node, nnode, item)) {
+       STAILQ_FOREACH_SAFE(item, &wq->items, wq, titem) {
                assert(item && item->data);
 
                /* dont run items which are past their allowed retries */
@@ -274,7 +266,7 @@ int work_queue_run(struct thread *thread)
                        /* run error handler, if any */
                        if (wq->spec.errorfunc)
                                wq->spec.errorfunc(wq, item->data);
-                       work_queue_item_remove(wq, node);
+                       work_queue_item_remove(wq, item);
                        continue;
                }
 
@@ -298,7 +290,7 @@ int work_queue_run(struct thread *thread)
                }
                case WQ_REQUEUE: {
                        item->ran--;
-                       work_queue_item_requeue(wq, node);
+                       work_queue_item_requeue(wq, item);
                        /* If a single node is being used with a meta-queue
                         * (e.g., zebra),
                         * update the next node as we don't want to exit the
@@ -309,8 +301,8 @@ int work_queue_run(struct thread *thread)
                         * will kick in
                         * to terminate the thread when time has exceeded.
                         */
-                       if (nnode == NULL)
-                               nnode = node;
+                       if (titem == NULL)
+                               titem = item;
                        break;
                }
                case WQ_RETRY_NOW:
@@ -323,7 +315,7 @@ int work_queue_run(struct thread *thread)
                /* fallthru */
                case WQ_SUCCESS:
                default: {
-                       work_queue_item_remove(wq, node);
+                       work_queue_item_remove(wq, item);
                        break;
                }
                }
@@ -376,7 +368,7 @@ stats:
 #endif
 
        /* Is the queue done yet? If it is, call the completion callback. */
-       if (listcount(wq->items) > 0)
+       if (!work_queue_empty(wq))
                work_queue_schedule(wq, 0);
        else if (wq->spec.completion_func)
                wq->spec.completion_func(wq);
index ff7f57690dd93721be68b5964fbe149508e0d1a1..df35d44fbcb9a67d48c33c9cf01e77992e8afbb0 100644 (file)
@@ -24,6 +24,7 @@
 #define _QUAGGA_WORK_QUEUE_H
 
 #include "memory.h"
+#include "queue.h"
 DECLARE_MTYPE(WORK_QUEUE)
 
 /* Hold time for the initial schedule of a queue run, in  millisec */
@@ -43,6 +44,7 @@ typedef enum {
 
 /* A single work queue item, unsurprisingly */
 struct work_queue_item {
+       STAILQ_ENTRY(work_queue_item) wq;
        void *data;      /* opaque data */
        unsigned short ran; /* # of times item has been run */
 };
@@ -91,7 +93,8 @@ struct work_queue {
        } spec;
 
        /* remaining fields should be opaque to users */
-       struct list *items;   /* queue item list */
+       STAILQ_HEAD(work_queue_items, work_queue_item) items; /* queue item list */
+       int item_count; /* queued items */
        unsigned long runs;   /* runs count */
        unsigned long yields; /* yields count */
 
@@ -107,6 +110,37 @@ struct work_queue {
 
 /* User API */
 
+static inline int work_queue_item_count(struct work_queue *wq)
+{
+       return wq->item_count;
+}
+
+static inline bool work_queue_empty(struct work_queue *wq)
+{
+       return (wq->item_count == 0) ? true : false;
+}
+
+static inline struct work_queue_item *work_queue_last_item(struct work_queue *wq)
+{
+       return STAILQ_LAST(&wq->items, work_queue_item, wq);
+}
+
+static inline void work_queue_item_enqueue(struct work_queue *wq,
+                                          struct work_queue_item *item)
+{
+       STAILQ_INSERT_TAIL(&wq->items, item, wq);
+       wq->item_count++;
+}
+
+static inline void work_queue_item_dequeue(struct work_queue *wq,
+                                          struct work_queue_item *item)
+{
+       assert(wq->item_count > 0);
+
+       wq->item_count--;
+       STAILQ_REMOVE(&wq->items, item, work_queue_item, wq);
+}
+
 /* create a new work queue, of given name.
  * user must fill in the spec of the returned work queue before adding
  * anything to it
index ed5355426524bdb528654c5bca083d8f061f5137..b27201616dafaee60996a5f323a50b7761276c8e 100644 (file)
@@ -1820,7 +1820,7 @@ void rib_queue_add(struct route_node *rn)
         * holder, if necessary, then push the work into it in any case.
         * This semantics was introduced after 0.99.9 release.
         */
-       if (!zebrad.ribq->items->count)
+       if (work_queue_empty(zebrad.ribq))
                work_queue_add(zebrad.ribq, zebrad.mq);
 
        rib_meta_queue_add(zebrad.mq, rn);