summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJorge Boncompte <jbonor@gmail.com>2017-08-08 20:32:30 +0200
committerJorge Boncompte <jbonor@gmail.com>2017-08-17 17:47:07 +0200
commitf104f6c1a62c92a4420e1d99abb336d566c1ff51 (patch)
treeb3eceba4d39ae56d9e8eff27e4a2bb16f1024b82
parentcd85bc2e0bd8657cb2555fe8d631a2b0d916588e (diff)
lib: cleanup the work queue implementation
Convert the work queue implementation to not use the generic linked list to mantain the item list and use instead a simple queue from queue.h that does not allocate memory for each node. Signed-off-by: Jorge Boncompte <jbonor@gmail.com>
-rw-r--r--lib/freebsd-queue.h2
-rw-r--r--lib/workqueue.c54
-rw-r--r--lib/workqueue.h36
-rw-r--r--zebra/zebra_rib.c2
4 files changed, 60 insertions, 34 deletions
diff --git a/lib/freebsd-queue.h b/lib/freebsd-queue.h
index 658b602ba3..d198f5674f 100644
--- a/lib/freebsd-queue.h
+++ b/lib/freebsd-queue.h
@@ -302,7 +302,7 @@ struct qm_trace {
(STAILQ_EMPTY((head)) \
? NULL \
: ((struct type *)(void *)((char *)((head)->stqh_last) \
- - __offsetof(struct type, \
+ - offsetof(struct type, \
field))))
#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 612421c80b..b76b73b367 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -72,14 +72,7 @@ struct work_queue *work_queue_new(struct thread_master *m,
new->master = m;
SET_FLAG(new->flags, WQ_UNPLUGGED);
- if ((new->items = list_new()) == NULL) {
- XFREE(MTYPE_WORK_QUEUE_NAME, new->name);
- XFREE(MTYPE_WORK_QUEUE, new);
-
- return NULL;
- }
-
- new->items->del = (void (*)(void *))work_queue_item_free;
+ STAILQ_INIT(&new->items);
listnode_add(work_queues, new);
@@ -97,8 +90,6 @@ void work_queue_free(struct work_queue *wq)
if (wq->thread != NULL)
thread_cancel(wq->thread);
- /* list_delete frees items via callback */
- list_delete(wq->items);
listnode_delete(work_queues, wq);
XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
@@ -114,8 +105,8 @@ bool work_queue_is_scheduled(struct work_queue *wq)
static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
{
/* if appropriate, schedule work queue thread */
- if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL)
- && (listcount(wq->items) > 0)) {
+ if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) &&
+ !work_queue_empty(wq)) {
wq->thread = NULL;
thread_add_timer_msec(wq->master, work_queue_run, wq, delay,
&wq->thread);
@@ -139,33 +130,35 @@ void work_queue_add(struct work_queue *wq, void *data)
}
item->data = data;
- listnode_add(wq->items, item);
+ work_queue_item_enqueue(wq, item);
work_queue_schedule(wq, wq->spec.hold);
return;
}
-static void work_queue_item_remove(struct work_queue *wq, struct listnode *ln)
+static void work_queue_item_remove(struct work_queue *wq,
+ struct work_queue_item *item)
{
- struct work_queue_item *item = listgetdata(ln);
-
assert(item && item->data);
/* call private data deletion callback if needed */
if (wq->spec.del_item_data)
wq->spec.del_item_data(wq, item->data);
- list_delete_node(wq->items, ln);
+ work_queue_item_dequeue(wq, item);
+
work_queue_item_free(item);
return;
}
-static void work_queue_item_requeue(struct work_queue *wq, struct listnode *ln)
+static void work_queue_item_requeue(struct work_queue *wq, struct work_queue_item *item)
{
- LISTNODE_DETACH(wq->items, ln);
- LISTNODE_ATTACH(wq->items, ln); /* attach to end of list */
+ work_queue_item_dequeue(wq, item);
+
+ /* attach to end of list */
+ work_queue_item_enqueue(wq, item);
}
DEFUN (show_work_queues,
@@ -186,7 +179,7 @@ DEFUN (show_work_queues,
for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) {
vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n",
(CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
- listcount(wq->items), wq->spec.hold, wq->runs,
+ work_queue_item_count(wq), wq->spec.hold, wq->runs,
wq->yields, wq->cycles.best, wq->cycles.granularity,
wq->cycles.total,
(wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs)
@@ -233,16 +226,15 @@ void work_queue_unplug(struct work_queue *wq)
int work_queue_run(struct thread *thread)
{
struct work_queue *wq;
- struct work_queue_item *item;
+ struct work_queue_item *item, *titem;
wq_item_status ret;
unsigned int cycles = 0;
- struct listnode *node, *nnode;
char yielded = 0;
wq = THREAD_ARG(thread);
wq->thread = NULL;
- assert(wq && wq->items);
+ assert(wq);
/* calculate cycle granularity:
* list iteration == 1 run
@@ -266,7 +258,7 @@ int work_queue_run(struct thread *thread)
if (wq->cycles.granularity == 0)
wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
- for (ALL_LIST_ELEMENTS(wq->items, node, nnode, item)) {
+ STAILQ_FOREACH_SAFE(item, &wq->items, wq, titem) {
assert(item && item->data);
/* dont run items which are past their allowed retries */
@@ -274,7 +266,7 @@ int work_queue_run(struct thread *thread)
/* run error handler, if any */
if (wq->spec.errorfunc)
wq->spec.errorfunc(wq, item->data);
- work_queue_item_remove(wq, node);
+ work_queue_item_remove(wq, item);
continue;
}
@@ -298,7 +290,7 @@ int work_queue_run(struct thread *thread)
}
case WQ_REQUEUE: {
item->ran--;
- work_queue_item_requeue(wq, node);
+ work_queue_item_requeue(wq, item);
/* If a single node is being used with a meta-queue
* (e.g., zebra),
* update the next node as we don't want to exit the
@@ -309,8 +301,8 @@ int work_queue_run(struct thread *thread)
* will kick in
* to terminate the thread when time has exceeded.
*/
- if (nnode == NULL)
- nnode = node;
+ if (titem == NULL)
+ titem = item;
break;
}
case WQ_RETRY_NOW:
@@ -323,7 +315,7 @@ int work_queue_run(struct thread *thread)
/* fallthru */
case WQ_SUCCESS:
default: {
- work_queue_item_remove(wq, node);
+ work_queue_item_remove(wq, item);
break;
}
}
@@ -376,7 +368,7 @@ stats:
#endif
/* Is the queue done yet? If it is, call the completion callback. */
- if (listcount(wq->items) > 0)
+ if (!work_queue_empty(wq))
work_queue_schedule(wq, 0);
else if (wq->spec.completion_func)
wq->spec.completion_func(wq);
diff --git a/lib/workqueue.h b/lib/workqueue.h
index ff7f57690d..df35d44fbc 100644
--- a/lib/workqueue.h
+++ b/lib/workqueue.h
@@ -24,6 +24,7 @@
#define _QUAGGA_WORK_QUEUE_H
#include "memory.h"
+#include "queue.h"
DECLARE_MTYPE(WORK_QUEUE)
/* Hold time for the initial schedule of a queue run, in millisec */
@@ -43,6 +44,7 @@ typedef enum {
/* A single work queue item, unsurprisingly */
struct work_queue_item {
+ STAILQ_ENTRY(work_queue_item) wq;
void *data; /* opaque data */
unsigned short ran; /* # of times item has been run */
};
@@ -91,7 +93,8 @@ struct work_queue {
} spec;
/* remaining fields should be opaque to users */
- struct list *items; /* queue item list */
+ STAILQ_HEAD(work_queue_items, work_queue_item) items; /* queue item list */
+ int item_count; /* queued items */
unsigned long runs; /* runs count */
unsigned long yields; /* yields count */
@@ -107,6 +110,37 @@ struct work_queue {
/* User API */
+static inline int work_queue_item_count(struct work_queue *wq)
+{
+ return wq->item_count;
+}
+
+static inline bool work_queue_empty(struct work_queue *wq)
+{
+ return (wq->item_count == 0) ? true : false;
+}
+
+static inline struct work_queue_item *work_queue_last_item(struct work_queue *wq)
+{
+ return STAILQ_LAST(&wq->items, work_queue_item, wq);
+}
+
+static inline void work_queue_item_enqueue(struct work_queue *wq,
+ struct work_queue_item *item)
+{
+ STAILQ_INSERT_TAIL(&wq->items, item, wq);
+ wq->item_count++;
+}
+
+static inline void work_queue_item_dequeue(struct work_queue *wq,
+ struct work_queue_item *item)
+{
+ assert(wq->item_count > 0);
+
+ wq->item_count--;
+ STAILQ_REMOVE(&wq->items, item, work_queue_item, wq);
+}
+
/* create a new work queue, of given name.
* user must fill in the spec of the returned work queue before adding
* anything to it
diff --git a/zebra/zebra_rib.c b/zebra/zebra_rib.c
index ed53554265..b27201616d 100644
--- a/zebra/zebra_rib.c
+++ b/zebra/zebra_rib.c
@@ -1820,7 +1820,7 @@ void rib_queue_add(struct route_node *rn)
* holder, if necessary, then push the work into it in any case.
* This semantics was introduced after 0.99.9 release.
*/
- if (!zebrad.ribq->items->count)
+ if (work_queue_empty(zebrad.ribq))
work_queue_add(zebrad.ribq, zebrad.mq);
rib_meta_queue_add(zebrad.mq, rn);