diff options
Diffstat (limited to 'lib/workqueue.c')
| -rw-r--r-- | lib/workqueue.c | 573 | 
1 files changed, 272 insertions, 301 deletions
diff --git a/lib/workqueue.c b/lib/workqueue.c index 60119f1f41..612421c80b 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -1,4 +1,4 @@ -/*  +/*   * Quagga Work Queue Support.   *   * Copyright (C) 2005 Sun Microsystems, Inc. @@ -28,7 +28,7 @@  #include "command.h"  #include "log.h" -DEFINE_MTYPE(LIB, WORK_QUEUE,             "Work queue") +DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue")  DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item")  DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string") @@ -41,145 +41,131 @@ static struct list *work_queues = &_work_queues;  #define WORK_QUEUE_MIN_GRANULARITY 1 -static struct work_queue_item * -work_queue_item_new (struct work_queue *wq) +static struct work_queue_item *work_queue_item_new(struct work_queue *wq)  { -  struct work_queue_item *item; -  assert (wq); +	struct work_queue_item *item; +	assert(wq); -  item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,  -                  sizeof (struct work_queue_item)); -   -  return item; +	item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item)); + +	return item;  } -static void -work_queue_item_free (struct work_queue_item *item) +static void work_queue_item_free(struct work_queue_item *item)  { -  XFREE (MTYPE_WORK_QUEUE_ITEM, item); -  return; +	XFREE(MTYPE_WORK_QUEUE_ITEM, item); +	return;  }  /* create new work queue */ -struct work_queue * -work_queue_new (struct thread_master *m, const char *queue_name) +struct work_queue *work_queue_new(struct thread_master *m, +				  const char *queue_name)  { -  struct work_queue *new; -   -  new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue)); - -  if (new == NULL) -    return new; -   -  new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); -  new->master = m; -  SET_FLAG (new->flags, WQ_UNPLUGGED); -   -  if ( (new->items = list_new ()) == NULL) -    { -      XFREE (MTYPE_WORK_QUEUE_NAME, new->name); -      XFREE (MTYPE_WORK_QUEUE, new); -       -      return NULL; -    } -   -  new->items->del = (void (*)(void *)) work_queue_item_free;   -   -  listnode_add (work_queues, new); -   -  new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; - -  /* Default values, can be overriden by caller */ -  new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; -  new->spec.yield = THREAD_YIELD_TIME_SLOT; -     -  return new; +	struct work_queue *new; + +	new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue)); + +	if (new == NULL) +		return new; + +	new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name); +	new->master = m; +	SET_FLAG(new->flags, WQ_UNPLUGGED); + +	if ((new->items = list_new()) == NULL) { +		XFREE(MTYPE_WORK_QUEUE_NAME, new->name); +		XFREE(MTYPE_WORK_QUEUE, new); + +		return NULL; +	} + +	new->items->del = (void (*)(void *))work_queue_item_free; + +	listnode_add(work_queues, new); + +	new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + +	/* Default values, can be overriden by caller */ +	new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; +	new->spec.yield = THREAD_YIELD_TIME_SLOT; + +	return new;  } -void -work_queue_free (struct work_queue *wq) +void work_queue_free(struct work_queue *wq)  { -  if (wq->thread != NULL) -    thread_cancel(wq->thread); -   -  /* list_delete frees items via callback */ -  list_delete (wq->items); -  listnode_delete (work_queues, wq); -   -  XFREE (MTYPE_WORK_QUEUE_NAME, wq->name); -  XFREE (MTYPE_WORK_QUEUE, wq); -  return; +	if (wq->thread != NULL) +		thread_cancel(wq->thread); + +	/* list_delete frees items via callback */ +	list_delete(wq->items); +	listnode_delete(work_queues, wq); + +	XFREE(MTYPE_WORK_QUEUE_NAME, wq->name); +	XFREE(MTYPE_WORK_QUEUE, wq); +	return;  } -bool -work_queue_is_scheduled (struct work_queue *wq) +bool work_queue_is_scheduled(struct work_queue *wq)  { -  return (wq->thread != NULL); +	return (wq->thread != NULL);  } -static int -work_queue_schedule (struct work_queue *wq, unsigned int delay) +static int work_queue_schedule(struct work_queue *wq, unsigned int delay)  { -  /* if appropriate, schedule work queue thread */ -  if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED) -       && (wq->thread == NULL) -       && (listcount (wq->items) > 0) ) -    { -      wq->thread = NULL; -      thread_add_timer_msec (wq->master, work_queue_run, wq, delay, -                             &wq->thread); -      /* set thread yield time, if needed */ -      if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT) -        thread_set_yield_time (wq->thread, wq->spec.yield); -      return 1; -    } -  else -    return 0; +	/* if appropriate, schedule work queue thread */ +	if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) +	    && (listcount(wq->items) > 0)) { +		wq->thread = NULL; +		thread_add_timer_msec(wq->master, work_queue_run, wq, delay, +				      &wq->thread); +		/* set thread yield time, if needed */ +		if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT) +			thread_set_yield_time(wq->thread, wq->spec.yield); +		return 1; +	} else +		return 0;  } -   -void -work_queue_add (struct work_queue *wq, void *data) + +void work_queue_add(struct work_queue *wq, void *data)  { -  struct work_queue_item *item; -   -  assert (wq); - -  if (!(item = work_queue_item_new (wq))) -    { -      zlog_err ("%s: unable to get new queue item", __func__); -      return; -    } -   -  item->data = data; -  listnode_add (wq->items, item); -   -  work_queue_schedule (wq, wq->spec.hold); -   -  return; +	struct work_queue_item *item; + +	assert(wq); + +	if (!(item = work_queue_item_new(wq))) { +		zlog_err("%s: unable to get new queue item", __func__); +		return; +	} + +	item->data = data; +	listnode_add(wq->items, item); + +	work_queue_schedule(wq, wq->spec.hold); + +	return;  } -static void -work_queue_item_remove (struct work_queue *wq, struct listnode *ln) +static void work_queue_item_remove(struct work_queue *wq, struct listnode *ln)  { -  struct work_queue_item *item = listgetdata (ln); +	struct work_queue_item *item = listgetdata(ln); -  assert (item && item->data); +	assert(item && item->data); -  /* call private data deletion callback if needed */   -  if (wq->spec.del_item_data) -    wq->spec.del_item_data (wq, item->data); +	/* call private data deletion callback if needed */ +	if (wq->spec.del_item_data) +		wq->spec.del_item_data(wq, item->data); -  list_delete_node (wq->items, ln); -  work_queue_item_free (item); -   -  return; +	list_delete_node(wq->items, ln); +	work_queue_item_free(item); + +	return;  } -static void -work_queue_item_requeue (struct work_queue *wq, struct listnode *ln) +static void work_queue_item_requeue(struct work_queue *wq, struct listnode *ln)  { -  LISTNODE_DETACH (wq->items, ln); -  LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */ +	LISTNODE_DETACH(wq->items, ln); +	LISTNODE_ATTACH(wq->items, ln); /* attach to end of list */  }  DEFUN (show_work_queues, @@ -188,227 +174,212 @@ DEFUN (show_work_queues,         SHOW_STR         "Work Queue information\n")  { -  struct listnode *node; -  struct work_queue *wq; -   -  vty_out (vty,  -           "%c %8s %5s %8s %8s %21s\n", -           ' ', "List","(ms) ","Q. Runs","Yields","Cycle Counts   "); -  vty_out (vty, -           "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", -           'P', -           "Items", -           "Hold", -           "Total","Total", -           "Best","Gran.","Total","Avg.", -           "Name"); -  -  for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq)) -    { -      vty_out (vty,"%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", -               (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), -               listcount (wq->items), -               wq->spec.hold, -               wq->runs, wq->yields, -               wq->cycles.best, wq->cycles.granularity, wq->cycles.total, -                 (wq->runs) ?  -                   (unsigned int) (wq->cycles.total / wq->runs) : 0, -               wq->name); -    } -     -  return CMD_SUCCESS; +	struct listnode *node; +	struct work_queue *wq; + +	vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ", +		"Q. Runs", "Yields", "Cycle Counts   "); +	vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items", +		"Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.", +		"Name"); + +	for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) { +		vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", +			(CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), +			listcount(wq->items), wq->spec.hold, wq->runs, +			wq->yields, wq->cycles.best, wq->cycles.granularity, +			wq->cycles.total, +			(wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs) +				   : 0, +			wq->name); +	} + +	return CMD_SUCCESS;  } -void -workqueue_cmd_init (void) +void workqueue_cmd_init(void)  { -  install_element (VIEW_NODE, &show_work_queues_cmd); +	install_element(VIEW_NODE, &show_work_queues_cmd);  }  /* 'plug' a queue: Stop it from being scheduled,   * ie: prevent the queue from draining.   */ -void -work_queue_plug (struct work_queue *wq) +void work_queue_plug(struct work_queue *wq)  { -  if (wq->thread) -    thread_cancel (wq->thread); -   -  wq->thread = NULL; -   -  UNSET_FLAG (wq->flags, WQ_UNPLUGGED); +	if (wq->thread) +		thread_cancel(wq->thread); + +	wq->thread = NULL; + +	UNSET_FLAG(wq->flags, WQ_UNPLUGGED);  }  /* unplug queue, schedule it again, if appropriate   * Ie: Allow the queue to be drained again   */ -void -work_queue_unplug (struct work_queue *wq) +void work_queue_unplug(struct work_queue *wq)  { -  SET_FLAG (wq->flags, WQ_UNPLUGGED); +	SET_FLAG(wq->flags, WQ_UNPLUGGED); -  /* if thread isnt already waiting, add one */ -  work_queue_schedule (wq, wq->spec.hold); +	/* if thread isnt already waiting, add one */ +	work_queue_schedule(wq, wq->spec.hold);  }  /* timer thread to process a work queue   * will reschedule itself if required, - * otherwise work_queue_item_add  + * otherwise work_queue_item_add   */ -int -work_queue_run (struct thread *thread) +int work_queue_run(struct thread *thread)  { -  struct work_queue *wq; -  struct work_queue_item *item; -  wq_item_status ret; -  unsigned int cycles = 0; -  struct listnode *node, *nnode; -  char yielded = 0; - -  wq = THREAD_ARG (thread); -  wq->thread = NULL; - -  assert (wq && wq->items); - -  /* calculate cycle granularity: -   * list iteration == 1 run -   * listnode processing == 1 cycle -   * granularity == # cycles between checks whether we should yield. -   * -   * granularity should be > 0, and can increase slowly after each run to -   * provide some hysteris, but not past cycles.best or 2*cycles. -   * -   * Best: starts low, can only increase -   * -   * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased  -   *              if we run to end of time slot, can increase otherwise  -   *              by a small factor. -   * -   * We could use just the average and save some work, however we want to be -   * able to adjust quickly to CPU pressure. Average wont shift much if -   * daemon has been running a long time. -   */ -   if (wq->cycles.granularity == 0) -     wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; - -  for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item)) -  { -    assert (item && item->data); -     -    /* dont run items which are past their allowed retries */ -    if (item->ran > wq->spec.max_retries) -      { -        /* run error handler, if any */ -	if (wq->spec.errorfunc) -	  wq->spec.errorfunc (wq, item->data); -	work_queue_item_remove (wq, node); -	continue; -      } - -    /* run and take care of items that want to be retried immediately */ -    do -      { -        ret = wq->spec.workfunc (wq, item->data); -        item->ran++; -      } -    while ((ret == WQ_RETRY_NOW)  -           && (item->ran < wq->spec.max_retries)); - -    switch (ret) -      { -      case WQ_QUEUE_BLOCKED: -        { -          /* decrement item->ran again, cause this isn't an item -           * specific error, and fall through to WQ_RETRY_LATER -           */ -          item->ran--; -        } -      case WQ_RETRY_LATER: -	{ -	  goto stats; -	} -      case WQ_REQUEUE: -	{ -	  item->ran--; -	  work_queue_item_requeue (wq, node); -          /* If a single node is being used with a meta-queue (e.g., zebra), -           * update the next node as we don't want to exit the thread and -           * reschedule it after every node. By definition, WQ_REQUEUE is -           * meant to continue the processing; the yield logic will kick in -           * to terminate the thread when time has exceeded. -           */ -          if (nnode == NULL) -            nnode = node; -	  break; +	struct work_queue *wq; +	struct work_queue_item *item; +	wq_item_status ret; +	unsigned int cycles = 0; +	struct listnode *node, *nnode; +	char yielded = 0; + +	wq = THREAD_ARG(thread); +	wq->thread = NULL; + +	assert(wq && wq->items); + +	/* calculate cycle granularity: +	 * list iteration == 1 run +	 * listnode processing == 1 cycle +	 * granularity == # cycles between checks whether we should yield. +	 * +	 * granularity should be > 0, and can increase slowly after each run to +	 * provide some hysteris, but not past cycles.best or 2*cycles. +	 * +	 * Best: starts low, can only increase +	 * +	 * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased +	 *              if we run to end of time slot, can increase otherwise +	 *              by a small factor. +	 * +	 * We could use just the average and save some work, however we want to +	 * be +	 * able to adjust quickly to CPU pressure. Average wont shift much if +	 * daemon has been running a long time. +	 */ +	if (wq->cycles.granularity == 0) +		wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + +	for (ALL_LIST_ELEMENTS(wq->items, node, nnode, item)) { +		assert(item && item->data); + +		/* dont run items which are past their allowed retries */ +		if (item->ran > wq->spec.max_retries) { +			/* run error handler, if any */ +			if (wq->spec.errorfunc) +				wq->spec.errorfunc(wq, item->data); +			work_queue_item_remove(wq, node); +			continue; +		} + +		/* run and take care of items that want to be retried +		 * immediately */ +		do { +			ret = wq->spec.workfunc(wq, item->data); +			item->ran++; +		} while ((ret == WQ_RETRY_NOW) +			 && (item->ran < wq->spec.max_retries)); + +		switch (ret) { +		case WQ_QUEUE_BLOCKED: { +			/* decrement item->ran again, cause this isn't an item +			 * specific error, and fall through to WQ_RETRY_LATER +			 */ +			item->ran--; +		} +		case WQ_RETRY_LATER: { +			goto stats; +		} +		case WQ_REQUEUE: { +			item->ran--; +			work_queue_item_requeue(wq, node); +			/* If a single node is being used with a meta-queue +			 * (e.g., zebra), +			 * update the next node as we don't want to exit the +			 * thread and +			 * reschedule it after every node. By definition, +			 * WQ_REQUEUE is +			 * meant to continue the processing; the yield logic +			 * will kick in +			 * to terminate the thread when time has exceeded. +			 */ +			if (nnode == NULL) +				nnode = node; +			break; +		} +		case WQ_RETRY_NOW: +		/* a RETRY_NOW that gets here has exceeded max_tries, same as +		 * ERROR */ +		case WQ_ERROR: { +			if (wq->spec.errorfunc) +				wq->spec.errorfunc(wq, item); +		} +		/* fallthru */ +		case WQ_SUCCESS: +		default: { +			work_queue_item_remove(wq, node); +			break; +		} +		} + +		/* completed cycle */ +		cycles++; + +		/* test if we should yield */ +		if (!(cycles % wq->cycles.granularity) +		    && thread_should_yield(thread)) { +			yielded = 1; +			goto stats; +		}  	} -      case WQ_RETRY_NOW: -        /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */ -      case WQ_ERROR: -	{ -	  if (wq->spec.errorfunc) -	    wq->spec.errorfunc (wq, item); -	} -	/* fallthru */ -      case WQ_SUCCESS: -      default: -	{ -	  work_queue_item_remove (wq, node); -	  break; -	} -      } - -    /* completed cycle */ -    cycles++; - -    /* test if we should yield */ -    if ( !(cycles % wq->cycles.granularity)  -        && thread_should_yield (thread)) -      { -        yielded = 1; -        goto stats; -      } -  }  stats:  #define WQ_HYSTERESIS_FACTOR 4 -  /* we yielded, check whether granularity should be reduced */ -  if (yielded && (cycles < wq->cycles.granularity)) -    { -      wq->cycles.granularity = ((cycles > 0) ? cycles -                                             : WORK_QUEUE_MIN_GRANULARITY); -    } -  /* otherwise, should granularity increase? */ -  else if (cycles >= (wq->cycles.granularity)) -    { -      if (cycles > wq->cycles.best) -        wq->cycles.best = cycles; - -      /* along with yielded check, provides hysteresis for granularity */ -      if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR -                                           * WQ_HYSTERESIS_FACTOR)) -        wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ -      else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) -        wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; -    } +	/* we yielded, check whether granularity should be reduced */ +	if (yielded && (cycles < wq->cycles.granularity)) { +		wq->cycles.granularity = +			((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY); +	} +	/* otherwise, should granularity increase? */ +	else if (cycles >= (wq->cycles.granularity)) { +		if (cycles > wq->cycles.best) +			wq->cycles.best = cycles; + +		/* along with yielded check, provides hysteresis for granularity +		 */ +		if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR +			      * WQ_HYSTERESIS_FACTOR)) +			wq->cycles.granularity *= +				WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ +		else if (cycles +			 > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) +			wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; +	}  #undef WQ_HYSTERIS_FACTOR -   -  wq->runs++; -  wq->cycles.total += cycles; -  if (yielded) -    wq->yields++; + +	wq->runs++; +	wq->cycles.total += cycles; +	if (yielded) +		wq->yields++;  #if 0    printf ("%s: cycles %d, new: best %d, worst %d\n",              __func__, cycles, wq->cycles.best, wq->cycles.granularity);  #endif -   -  /* Is the queue done yet? If it is, call the completion callback. */ -  if (listcount (wq->items) > 0) -    work_queue_schedule (wq, 0); -  else if (wq->spec.completion_func) -    wq->spec.completion_func (wq); -   -  return 0; + +	/* Is the queue done yet? If it is, call the completion callback. */ +	if (listcount(wq->items) > 0) +		work_queue_schedule(wq, 0); +	else if (wq->spec.completion_func) +		wq->spec.completion_func(wq); + +	return 0;  }  | 
