diff options
| author | paul <paul> | 2005-04-25 16:26:42 +0000 | 
|---|---|---|
| committer | paul <paul> | 2005-04-25 16:26:42 +0000 | 
| commit | 354d119a6569b2c6335ae9309e4606e5cccedb6a (patch) | |
| tree | e29bb213c52bb46b3454f845fa5892df306b82e2 | |
| parent | b64d92a8a88e69f711976a3c12c17667ecaba4ee (diff) | |
2005-04-25 Paul Jakma <paul.jakma@sun.com>
	* workqueue.{c,h}: Helper API for setting up and running queues via
	  background threads.
	* command.c: install the 'show workqueues' command
	* memtypes.c: Add work queue mtypes, and a rib-queue type for
	  a zebra rib work queue.
	* memtypes.h: Updated to match memtypes.c
	* Makefile.am: Add new workqueue files to build.
| -rw-r--r-- | lib/Makefile.am | 5 | ||||
| -rw-r--r-- | lib/command.c | 9 | ||||
| -rw-r--r-- | lib/memtypes.c | 6 | ||||
| -rw-r--r-- | lib/memtypes.h | 5 | ||||
| -rw-r--r-- | lib/workqueue.c | 329 | ||||
| -rw-r--r-- | lib/workqueue.h | 91 | 
6 files changed, 439 insertions, 6 deletions
diff --git a/lib/Makefile.am b/lib/Makefile.am index 03ca5e1261..3763c8b422 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -12,7 +12,7 @@ libzebra_la_SOURCES = \  	sockunion.c prefix.c thread.c if.c memory.c buffer.c table.c hash.c \  	filter.c routemap.c distribute.c stream.c str.c log.c plist.c \  	zclient.c sockopt.c smux.c md5.c if_rmap.c keychain.c privs.c \ -	sigevent.c pqueue.c jhash.c memtypes.c +	sigevent.c pqueue.c jhash.c memtypes.c workqueue.c  BUILT_SOURCES = memtypes.h @@ -25,7 +25,8 @@ pkginclude_HEADERS = \  	memory.h network.h prefix.h routemap.h distribute.h sockunion.h \  	str.h stream.h table.h thread.h vector.h version.h vty.h zebra.h \  	plist.h zclient.h sockopt.h smux.h md5-gnu.h if_rmap.h keychain.h \ -	privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h +	privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h \ +	privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h workqueue.h  EXTRA_DIST = regex.c regex-gnu.h memtypes.awk diff --git a/lib/command.c b/lib/command.c index 641181038e..9b5f75f279 100644 --- a/lib/command.c +++ b/lib/command.c @@ -1,5 +1,5 @@  /* -   $Id: command.c,v 1.46 2005/03/14 20:19:01 paul Exp $ +   $Id: command.c,v 1.47 2005/04/25 16:26:42 paul Exp $     Command interpreter routine for virtual terminal [aka TeletYpe]     Copyright (C) 1997, 98, 99 Kunihiro Ishiguro @@ -31,6 +31,7 @@ Boston, MA 02111-1307, USA.  */  #include "vector.h"  #include "vty.h"  #include "command.h" +#include "workqueue.h"  /* Command vector which includes some level of command lists. Normally     each daemon maintains each own cmdvec. */ @@ -3578,8 +3579,10 @@ cmd_init (int terminal)        install_element (CONFIG_NODE, &service_terminal_length_cmd);        install_element (CONFIG_NODE, &no_service_terminal_length_cmd); -      install_element(VIEW_NODE, &show_thread_cpu_cmd); -      install_element(ENABLE_NODE, &show_thread_cpu_cmd); +      install_element (VIEW_NODE, &show_thread_cpu_cmd); +      install_element (ENABLE_NODE, &show_thread_cpu_cmd); +      install_element (VIEW_NODE, &show_work_queues_cmd); +      install_element (ENABLE_NODE, &show_work_queues_cmd);      }    srand(time(NULL));  } diff --git a/lib/memtypes.c b/lib/memtypes.c index 7caa42a130..7865f5449b 100644 --- a/lib/memtypes.c +++ b/lib/memtypes.c @@ -6,7 +6,7 @@   * The script is sensitive to the format (though not whitespace), see   * the top of memtypes.awk for more details.   * - * $Id: memtypes.c,v 1.3 2005/04/25 14:02:44 paul Exp $ + * $Id: memtypes.c,v 1.4 2005/04/25 16:26:43 paul Exp $   */  #include "zebra.h" @@ -64,6 +64,9 @@ struct memory_list memory_list_lib[] =    { MTYPE_PRIVS,		"Privilege information"		},    { MTYPE_ZLOG,			"Logging"			},    { MTYPE_ZCLIENT,		"Zclient"			}, +  { MTYPE_WORK_QUEUE,		"Work queue"			}, +  { MTYPE_WORK_QUEUE_ITEM,	"Work queue item"		}, +  { MTYPE_WORK_QUEUE_NAME,	"Work queue name string"	},    { -1, NULL },  }; @@ -74,6 +77,7 @@ struct memory_list memory_list_zebra[] =    { MTYPE_VRF_NAME,		"VRF name"			},    { MTYPE_NEXTHOP,		"Nexthop"			},    { MTYPE_RIB,			"RIB"				}, +  { MTYPE_RIB_QUEUE,		"RIB process work queue"	},    { MTYPE_STATIC_IPV4,		"Static IPv4 route"		},    { MTYPE_STATIC_IPV6,		"Static IPv6 route"		},    { -1, NULL }, diff --git a/lib/memtypes.h b/lib/memtypes.h index 2d843c5bbe..b1ca6f6aec 100644 --- a/lib/memtypes.h +++ b/lib/memtypes.h @@ -56,11 +56,16 @@ enum    MTYPE_PRIVS,    MTYPE_ZLOG,    MTYPE_ZCLIENT, +  MTYPE_WORK_QUEUE, +  MTYPE_WORK_QUEUE_ITEM, +  MTYPE_WORK_QUEUE_NAME, +  MTYPE_WORK_QUEUE_SPEC,    MTYPE_RTADV_PREFIX,    MTYPE_VRF,    MTYPE_VRF_NAME,    MTYPE_NEXTHOP,    MTYPE_RIB, +  MTYPE_RIB_QUEUE,    MTYPE_STATIC_IPV4,    MTYPE_STATIC_IPV6,    MTYPE_BGP, diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 0000000000..0c9592d2a4 --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,329 @@ +/*  + * Quagga Work Queue Support. + * + * Copyright (C) 2005 Sun Microsystems, Inc. + * + * This file is part of GNU Zebra. + * + * Quagga is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * Quagga is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Quagga; see the file COPYING.  If not, write to the Free + * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA.   + */ + +#include <lib/zebra.h> +#include "thread.h" +#include "memory.h" +#include "workqueue.h" +#include "linklist.h" +#include "command.h" +#include "log.h" + +/* master list of work_queues */ +static struct list work_queues; + +#define WORK_QUEUE_MIN_GRANULARITY 1 + +static struct work_queue_item * +work_queue_item_new (struct work_queue *wq) +{ +  struct work_queue_item *item; +  assert (wq); + +  item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,  +                  sizeof (struct work_queue_item)); +   +  return item; +} + +static void +work_queue_item_free (struct work_queue_item *item) +{ +  XFREE (MTYPE_WORK_QUEUE_ITEM, item); +  return; +} + +/* create new work queue */ +struct work_queue * +work_queue_new (struct thread_master *m, const char *queue_name) +{ +  struct work_queue *new; +   +  new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue)); + +  if (new == NULL) +    return new; +   +  new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); +  new->master = m; +   +  if ( (new->items = list_new ()) == NULL) +    { +      if (new->items) +        list_free (new->items); +       +      XFREE (MTYPE_WORK_QUEUE_NAME, new->name); +      XFREE (MTYPE_WORK_QUEUE, new); +       +      return NULL; +    } +   +  new->items->del = (void (*)(void *)) work_queue_item_free;   +   +  listnode_add (&work_queues, new); +   +  new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; +   +  return new; +} + +void +work_queue_free (struct work_queue *wq) +{ +  /* list_delete frees items via callback */ +  list_delete (wq->items); +  listnode_delete (&work_queues, wq); +   +  XFREE (MTYPE_WORK_QUEUE_NAME, wq->name); +  XFREE (MTYPE_WORK_QUEUE, wq); +  return; +} + +void +work_queue_add (struct work_queue *wq, void *data) +{ +  struct work_queue_item *item; +   +  assert (wq); + +  if (!(item = work_queue_item_new (wq))) +    { +      zlog_err ("%s: unable to get new queue item", __func__); +      return; +    } +   +  item->data = data; +  listnode_add (wq->items, item); +   +  /* if thread isnt already waiting, add one */ +  if (wq->thread == NULL) +    wq->thread = thread_add_background (wq->master, work_queue_run,  +                                        wq, wq->spec.hold); + +  /* XXX: what if we didnt get a thread? try again? */ +   +  return; +} + +static void +work_queue_item_remove (struct work_queue *wq, struct listnode *ln) +{ +  struct work_queue_item *item = listgetdata (ln); + +  assert (item && item->data); + +  /* call private data deletion callback if needed */   +  if (wq->spec.del_item_data) +    wq->spec.del_item_data (item->data); + +  list_delete_node (wq->items, ln); +  work_queue_item_free (item); +   +  return; +} + +static void +work_queue_item_requeue (struct work_queue *wq, struct listnode *ln) +{ +  LISTNODE_DETACH (wq->items, ln); +  LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */ +} + +DEFUN(show_work_queues, +      show_work_queues_cmd, +      "show work-queues", +      SHOW_STR +      "Work Queue information\n") +{ +  struct listnode *node; +  struct work_queue *wq; +  struct timeval tvnow; +   +  gettimeofday (&tvnow, NULL); +   +  vty_out (vty,  +           "%8s  %11s  %8s %21s%s", +           "List","(ms)    ","Q. Runs","Cycle Counts   ", +           VTY_NEWLINE); +  vty_out (vty, +           "%8s  %5s %5s  %8s  %7s %6s %6s %s%s", +           "Items", +           "Delay","Hold", +           "Total", +           "Best","Gran.","Avg.",  +           "Name",  +           VTY_NEWLINE); +  +  for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq)) +    { +      vty_out (vty,"%8d  %5d %5d  %8ld  %7d %6d %6u  %s%s", +               listcount (wq->items), +               wq->spec.delay, wq->spec.hold, +               wq->runs, +               wq->cycles.best, wq->cycles.granularity,  +                 (unsigned int)(wq->cycles.total / wq->runs), +               wq->name, +               VTY_NEWLINE); +    } +     +  return CMD_SUCCESS; +} + +/* timer thread to process a work queue + * will reschedule itself if required, + * otherwise work_queue_item_add  + */ +int +work_queue_run (struct thread *thread) +{ +  struct work_queue *wq; +  struct work_queue_item *item; +  wq_item_status ret; +  unsigned int cycles = 0; +  struct listnode *node, *nnode; +  char yielded = 0; + +  wq = THREAD_ARG (thread); +  wq->thread = NULL; + +  assert (wq && wq->items); + +  /* calculate cycle granularity: +   * list iteration == 1 cycle +   * granularity == # cycles between checks whether we should yield. +   * +   * granularity should be > 0, and can increase slowly after each run to +   * provide some hysteris, but not past cycles.best or 2*cycles. +   * +   * Best: starts low, can only increase +   * +   * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased if we run to end of time +   *              slot, can increase otherwise by a small factor. +   * +   * We could use just the average and save some work, however we want to be +   * able to adjust quickly to CPU pressure. Average wont shift much if +   * daemon has been running a long time. +   */ +   if (wq->cycles.granularity == 0) +     wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + +  for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item)) +  { +    assert (item && item->data); +     +    /* dont run items which are past their allowed retries */ +    if (item->retry_count >= wq->spec.max_retries) +      { +        /* run error handler, if any */ +	if (wq->spec.errorfunc) +	  wq->spec.errorfunc (wq, item->data); +	work_queue_item_remove (wq, node); +	continue; +      } + +    /* run and take care of items that want to be retried immediately */ +    do +      { +        ret = wq->spec.workfunc (item->data); +        item->retry_count++; +      } +    while ((ret == WQ_RETRY_NOW)  +           && (item->retry_count < wq->spec.max_retries)); + +    switch (ret) +      { +      case WQ_RETRY_LATER: +	{ +	  item->retry_count++; +	  goto stats; +	} +      case WQ_REQUEUE: +	{ +	  item->retry_count++; +	  work_queue_item_requeue (wq, node); +	  break; +	} +      case WQ_RETRY_NOW: +      case WQ_ERROR: +	{ +	  if (wq->spec.errorfunc) +	    wq->spec.errorfunc (wq, item); +	} +	/* fall through here is deliberate */ +      case WQ_SUCCESS: +      default: +	{ +	  work_queue_item_remove (wq, node); +	  break; +	} +      } + +    /* completed cycle */ +    cycles++; + +    /* test if we should yield */ +    if ( !(cycles % wq->cycles.granularity)  +        && thread_should_yield (thread)) +      { +        yielded = 1; +        goto stats; +      } +  } + +stats: + +#define WQ_HYSTERIS_FACTOR 2 + +  /* we yielded, check whether granularity should be reduced */ +  if (yielded && (cycles < wq->cycles.granularity)) +    { +      wq->cycles.granularity = ((cycles > 0) ? cycles  +                                             : WORK_QUEUE_MIN_GRANULARITY); +    } +   +  if (cycles > (wq->cycles.granularity)) +    { +      if (cycles > wq->cycles.best) +        wq->cycles.best = cycles; +       +      /* along with yielded check, provides hysteris for granularity */ +      if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR)) +        wq->cycles.granularity += WQ_HYSTERIS_FACTOR; +    } +#undef WQ_HYSTERIS_FACTOR +   +  wq->runs++; +  wq->cycles.total += cycles; + +#if 0 +  printf ("%s: cycles %d, new: best %d, worst %d\n", +            __func__, cycles, wq->cycles.best, wq->cycles.granularity); +#endif +   +  /* Is the queue done yet? */ +  if (listcount (wq->items) > 0) +    wq->thread = thread_add_background (wq->master, work_queue_run, wq, +                                        wq->spec.delay); + +  return 0; +} diff --git a/lib/workqueue.h b/lib/workqueue.h new file mode 100644 index 0000000000..45fffe7ba7 --- /dev/null +++ b/lib/workqueue.h @@ -0,0 +1,91 @@ +/*  + * Quagga Work Queues. + * + * Copyright (C) 2005 Sun Microsystems, Inc. + * + * This file is part of Quagga. + * + * Quagga is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * Quagga is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Quagga; see the file COPYING.  If not, write to the Free + * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA.   + */ + +#ifndef _QUAGGA_WORK_QUEUE_H +#define _QUAGGA_WORK_QUEUE_H + +/* Work queue default hold and cycle times - millisec */ +#define WORK_QUEUE_DEFAULT_HOLD  50  /* hold time for initial run of a queue */ +#define WORK_QUEUE_DEFAULT_DELAY 10  /* minimum delay between queue runs */ + +/* action value, for use by item processor and item error handlers */ +typedef enum +{ +  WQ_SUCCESS = 0, +  WQ_ERROR,             /* Error, run error handler if provided */ +  WQ_RETRY_NOW,         /* retry immediately */ +  WQ_RETRY_LATER,       /* retry later, cease processing work queue */ +  WQ_REQUEUE            /* requeue item, continue processing work queue */ +} wq_item_status; + +/* A single work queue item, unsurprisingly */ +struct work_queue_item +{ +  void *data;                           /* opaque data */ +  unsigned short retry_count;           /* number of times retried */             +}; + +struct work_queue +{ +  struct thread_master *master;       /* thread master */ +  struct thread *thread;              /* thread, if one is active */ +  char *name;                         /* work queue name */ +   +  /* specification for this work queue */ +  struct { +    /* work function to process items with */ +    wq_item_status (*workfunc) (void *); + +    /* error handling function, optional */ +    void (*errorfunc) (struct work_queue *, struct work_queue_item *); +     +    /* callback to delete user specific item data */ +    void (*del_item_data) (void *); +     +    /* max number of retries to make for item that errors */ +    unsigned int max_retries;	 + +    unsigned int hold;	/* hold time for first run, in ms */ +    unsigned int delay; /* min delay between queue runs, in ms */ +  } spec; +   +  /* remaining fields should be opaque to users */ +  struct list *items;                 /* queue item list */ +  unsigned long runs;                  /* runs count */ +   +  struct { +    unsigned int best; +    unsigned int granularity; +    unsigned long total; +  } cycles;	/* cycle counts */ +}; + +/* User API */ +struct work_queue *work_queue_new (struct thread_master *, const char *); +void work_queue_free (struct work_queue *); +void work_queue_add (struct work_queue *, void *); + +/* Helpers, exported for thread.c and command.c */ +int work_queue_run (struct thread *); +extern struct cmd_element show_work_queues_cmd; +#endif /* _QUAGGA_WORK_QUEUE_H */  | 
