]> git.puffer.fish Git - matthieu/frr.git/commitdiff
+ initial edition of meta-queue for RIB updates processing (bug #431)
authorDenis Ovsienko <linux@pilot.org.ua>
Mon, 2 Jun 2008 12:03:22 +0000 (12:03 +0000)
committerDenis Ovsienko <linux@pilot.org.ua>
Mon, 2 Jun 2008 12:03:22 +0000 (12:03 +0000)
lib/ChangeLog
lib/workqueue.c
lib/workqueue.h
zebra/ChangeLog
zebra/connected.c
zebra/rib.h
zebra/zebra_rib.c
zebra/zserv.h

index da0fa8ca3b3e545f95b7dc5ac4ef5f68dfe1f64e..d8eb06ecef62c1f04cfd0a0a0f0f9ecc70bc2f03 100644 (file)
@@ -1,3 +1,10 @@
+2008-06-02 Denis Ovsienko
+
+       * workqueue.[ch]: completely drop WQ_AIM_HEAD flag and
+         work_queue_aim_head() function, they aren't needed any more
+         with the new meta queue structure; (work_queue_run) don't
+         increment the counter on work item requeueing
+
 2008-02-28 Paul Jakma <paul.jakma@sun.com>
 
        * log.c: (mes_lookup) Sowmini Varadhan diagnosed a problem where
index 8880b9e22ed01ae31ed1c9290940ecd94cb233a2..1d32d24117853dc105d83861bb870d4a15327696 100644 (file)
@@ -67,7 +67,6 @@ work_queue_new (struct thread_master *m, const char *queue_name)
   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
   new->master = m;
   SET_FLAG (new->flags, WQ_UNPLUGGED);
-  UNSET_FLAG (new->flags, WQ_AIM_HEAD);
   
   if ( (new->items = list_new ()) == NULL)
     {
@@ -131,10 +130,7 @@ work_queue_add (struct work_queue *wq, void *data)
     }
   
   item->data = data;
-  if (CHECK_FLAG (wq->flags, WQ_AIM_HEAD))
-    listnode_add_after (wq->items, NULL, item);
-  else
-    listnode_add (wq->items, item);
+  listnode_add (wq->items, item);
   
   work_queue_schedule (wq, wq->spec.hold);
   
@@ -231,15 +227,6 @@ work_queue_unplug (struct work_queue *wq)
   work_queue_schedule (wq, wq->spec.hold);
 }
 
-void
-work_queue_aim_head (struct work_queue *wq, const unsigned aim_head)
-{
-  if (aim_head)
-    SET_FLAG (wq->flags, WQ_AIM_HEAD);
-  else
-    UNSET_FLAG (wq->flags, WQ_AIM_HEAD);
-}
-
 /* timer thread to process a work queue
  * will reschedule itself if required,
  * otherwise work_queue_item_add 
@@ -317,6 +304,7 @@ work_queue_run (struct thread *thread)
        }
       case WQ_REQUEUE:
        {
+         item->ran--;
          work_queue_item_requeue (wq, node);
          break;
        }
index 3150c32e7adf7c2fb2c8198143f4be61a71d1bb6..f59499a0a78b8ce6f3735f5d1bb420cf3bc3ad9f 100644 (file)
@@ -48,7 +48,6 @@ struct work_queue_item
 };
 
 #define WQ_UNPLUGGED   (1 << 0) /* available for draining */
-#define WQ_AIM_HEAD    (1 << 1) /* add new items before list head, not after tail */
 
 struct work_queue
 {
@@ -119,8 +118,6 @@ extern void work_queue_add (struct work_queue *, void *);
 extern void work_queue_plug (struct work_queue *wq);
 /* unplug the queue, allow it to be drained again */
 extern void work_queue_unplug (struct work_queue *wq);
-/* control the value for WQ_AIM_HEAD flag */
-extern void work_queue_aim_head (struct work_queue *wq, const unsigned);
 
 /* Helpers, exported for thread.c and command.c */
 extern int work_queue_run (struct thread *);
index 6f6dfa2c2e8990a8b4344dc635b9dc551165ce87..6483f2c7445926b0001ddde152e7bd504250a7c2 100644 (file)
@@ -1,3 +1,18 @@
+2008-06-02 Denis Ovsienko
+
+       * connected.c: (connected_up_ipv4, connected_down_ipv4,
+         connected_up_ipv6, connected_down_ipv6): don't call
+         work_queue_aim_head()
+       * rib.h: adjust RIB_ROUTE_QUEUED macro for meta_queue,
+         declare meta_queue structure
+       * zebra_rib.c: (process_subq, meta_queue_process, rib_meta_queue_add,
+         meta_queue_new) new functions; (rib_queue_add) don't try checking
+         RIB_QUEUE_ADDED flag, rib_meta_queue_add() does it better, take care
+         of the meta queue instead; (rib_queue_init) initialize the meta queue
+         as well; (rib_lookup_and_pushup) don't call work_queue_aim_head();
+         (rib_process) only do actual processing, don't do deallocation;
+       * zserv.h: include meta_queue field into zebra_t structure
+
 2008-05-29 Stephen Hemminger <stephen.hemminger@vyatta.com>
 
        * rt_netlink.c: (netlink_install_filter) BPF filter to catch and
index 8bf1d337dbcd67f0f2113f17e95c8f25df019a17..ad3e96078b37ffebafdc437e4399ca04f82f80df 100644 (file)
@@ -188,15 +188,8 @@ connected_up_ipv4 (struct interface *ifp, struct connected *ifc)
   if (prefix_ipv4_any (&p))
     return;
 
-  /* Always push arriving/departing connected routes into the head of
-   * the working queue to make possible proper validation of the rest
-   * of the RIB queue (which will contain the whole RIB after the first
-   * call to rib_update()).
-   */
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_add_ipv4 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, NULL, ifp->ifindex,
        RT_TABLE_MAIN, ifp->metric, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -302,9 +295,7 @@ connected_down_ipv4 (struct interface *ifp, struct connected *ifc)
     return;
 
   /* Same logic as for connected_up_ipv4(): push the changes into the head. */
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_delete_ipv4 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -349,10 +340,8 @@ connected_up_ipv6 (struct interface *ifp, struct connected *ifc)
     return;
 #endif
 
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_add_ipv6 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0,
                 ifp->metric, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -426,9 +415,7 @@ connected_down_ipv6 (struct interface *ifp, struct connected *ifc)
   if (IN6_IS_ADDR_UNSPECIFIED (&p.prefix))
     return;
 
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_delete_ipv6 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
index 9621f2c8de104285a2b90a504c8887eea3d77865..887ed3c2c154174889c0062c6938a7301ff1062f 100644 (file)
@@ -40,7 +40,7 @@ struct rib
 {
   /* Status Flags for the *route_node*, but kept in the head RIB.. */
   u_char rn_status;
-#define RIB_ROUTE_QUEUED       (1 << 0)
+#define RIB_ROUTE_QUEUED(x)    (1 << (x))
 
   /* Link list. */
   struct rib *next;
@@ -83,6 +83,20 @@ struct rib
   u_char nexthop_fib_num;
 };
 
+/* meta-queue structure:
+ * sub-queue 0: connected, kernel
+ * sub-queue 1: static
+ * sub-queue 2: RIP, RIPng, OSPF, OSPF6, IS-IS
+ * sub-queue 3: iBGP, eBGP
+ * sub-queue 4: any other origin (if any)
+ */
+#define MQ_SIZE 5
+struct meta_queue
+{
+  struct list *subq[MQ_SIZE];
+  u_int32_t size; /* sum of lengths of all subqueues */
+};
+
 /* Static route information. */
 struct static_ipv4
 {
index c6af32904942ec3459899239e8afaa6a17cd585b..4cb72ba8c8ed3c3146c0689a0d906842dba2883d 100644 (file)
@@ -981,15 +981,14 @@ rib_uninstall (struct route_node *rn, struct rib *rib)
 static void rib_unlink (struct route_node *, struct rib *);
 
 /* Core function for processing routing information base. */
-static wq_item_status
-rib_process (struct work_queue *wq, void *data)
+static void
+rib_process (struct route_node *rn)
 {
   struct rib *rib;
   struct rib *next;
   struct rib *fib = NULL;
   struct rib *select = NULL;
   struct rib *del = NULL;
-  struct route_node *rn = data;
   int installed = 0;
   struct nexthop *nexthop = NULL;
   char buf[INET6_ADDRSTRLEN];
@@ -1177,10 +1176,95 @@ rib_process (struct work_queue *wq, void *data)
 end:
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p dequeued", __func__, buf, rn->p.prefixlen, rn);
-  if (rn->info)
-    UNSET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);  
-  route_unlock_node (rn); /* rib queue lock */
-  return WQ_SUCCESS;
+}
+
+/* Take a list of route_node structs and return 1, if there was a record picked from
+ * it and processed by rib_process(). Don't process more, than one RN record; operate
+ * only in the specified sub-queue.
+ */
+unsigned int
+process_subq (struct list * subq, u_char qindex)
+{
+  struct listnode *lnode;
+  struct route_node *rnode;
+  if (!(lnode = listhead (subq)))
+    return 0;
+  rnode = listgetdata (lnode);
+  rib_process (rnode);
+  if (rnode->info) /* The first RIB record is holding the flags bitmask. */
+    UNSET_FLAG (((struct rib *)rnode->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+  route_unlock_node (rnode);
+  list_delete_node (subq, lnode);
+  return 1;
+}
+
+/* Dispatch the meta queue by picking, processing and unlocking the next RN from
+ * a non-empty sub-queue with lowest priority. wq is equal to zebra->ribq and data
+ * is pointed to the meta queue structure.
+ */
+static wq_item_status
+meta_queue_process (struct work_queue *dummy, void *data)
+{
+  struct meta_queue * mq = data;
+  u_char i;
+  for (i = 0; i < MQ_SIZE; i++)
+    if (process_subq (mq->subq[i], i))
+    {
+      mq->size--;
+      break;
+    }
+  return mq->size ? WQ_REQUEUE : WQ_SUCCESS;
+}
+
+/* Look into the RN and queue it into one or more priority queues, increasing the size
+ * for each data push done.
+ */
+void rib_meta_queue_add (struct meta_queue *mq, struct route_node *rn)
+{
+  u_char qindex;
+  struct rib *rib;
+  char buf[INET6_ADDRSTRLEN];
+  if (IS_ZEBRA_DEBUG_RIB_Q)
+    inet_ntop (rn->p.family, &rn->p.u.prefix, buf, INET6_ADDRSTRLEN);
+  for (rib = rn->info; rib; rib = rib->next)
+  {
+    switch (rib->type)
+    {
+      case ZEBRA_ROUTE_KERNEL:
+      case ZEBRA_ROUTE_CONNECT:
+        qindex = 0;
+        break;
+      case ZEBRA_ROUTE_STATIC:
+        qindex = 1;
+        break;
+      case ZEBRA_ROUTE_RIP:
+      case ZEBRA_ROUTE_RIPNG:
+      case ZEBRA_ROUTE_OSPF:
+      case ZEBRA_ROUTE_OSPF6:
+      case ZEBRA_ROUTE_ISIS:
+        qindex = 2;
+        break;
+      case ZEBRA_ROUTE_BGP:
+        qindex = 3;
+        break;
+      default:
+        qindex = 4;
+        break;
+    }
+    /* Invariant: at this point we always have rn->info set. */
+    if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex)))
+    {
+      if (IS_ZEBRA_DEBUG_RIB_Q)
+        zlog_debug ("%s: %s/%d: rn %p is already queued in sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+      continue;
+    }
+    SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+    listnode_add (mq->subq[qindex], rn);
+    route_lock_node (rn);
+    mq->size++;
+    if (IS_ZEBRA_DEBUG_RIB_Q)
+      zlog_debug ("%s: %s/%d: queued rn %p into sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+  }
 }
 
 /* Add route_node to work queue and schedule processing */
@@ -1202,17 +1286,6 @@ rib_queue_add (struct zebra_t *zebra, struct route_node *rn)
       return;
     }
 
-  /* Route-table node already queued, so nothing to do */
-  if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED))
-    {
-      if (IS_ZEBRA_DEBUG_RIB_Q)
-        zlog_debug ("%s: %s/%d: rn %p already queued", __func__, buf,
-          rn->p.prefixlen, rn);
-      return;
-    }
-
-  route_lock_node (rn); /* rib queue lock */
-
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_info ("%s: %s/%d: work queue added", __func__, buf, rn->p.prefixlen);
 
@@ -1221,13 +1294,21 @@ rib_queue_add (struct zebra_t *zebra, struct route_node *rn)
   if (zebra->ribq == NULL)
     {
       zlog_err ("%s: work_queue does not exist!", __func__);
-      route_unlock_node (rn);
       return;
     }
-  
-  work_queue_add (zebra->ribq, rn);
 
-  SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);
+  /* The RIB queue should normally be either empty or holding the only work_queue_item
+   * element. In the latter case this element would hold a pointer to the meta queue
+   * structure, which must be used to actually queue the route nodes to process. So
+   * create the MQ holder, if necessary, then push the work into it in any case.
+   * This semantics was introduced after 0.99.9 release.
+   */
+
+  /* Should I invent work_queue_empty() and use it, or it's Ok to do as follows? */
+  if (!zebra->ribq->items->count)
+    work_queue_add (zebra->ribq, zebra->mq);
+
+  rib_meta_queue_add (zebra->mq, rn);
 
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p queued", __func__, buf, rn->p.prefixlen, rn);
@@ -1235,6 +1316,30 @@ rib_queue_add (struct zebra_t *zebra, struct route_node *rn)
   return;
 }
 
+/* Create new meta queue. A destructor function doesn't seem to be necessary here. */
+struct meta_queue *
+meta_queue_new ()
+{
+  struct meta_queue *new;
+  unsigned i, failed = 0;
+
+  if ((new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct meta_queue))) == NULL)
+    return NULL;
+  for (i = 0; i < MQ_SIZE; i++)
+    if ((new->subq[i] = list_new ()) == NULL)
+      failed = 1;
+  if (failed)
+  {
+    for (i = 0; i < MQ_SIZE; i++)
+      if (new->subq[i])
+        list_delete (new->subq[i]);
+    XFREE (MTYPE_WORK_QUEUE, new);
+    return NULL;
+  }
+  new->size = 0;
+  return new;
+}
+
 /* initialise zebra rib work queue */
 static void
 rib_queue_init (struct zebra_t *zebra)
@@ -1249,12 +1354,17 @@ rib_queue_init (struct zebra_t *zebra)
     }
 
   /* fill in the work queue spec */
-  zebra->ribq->spec.workfunc = &rib_process;
+  zebra->ribq->spec.workfunc = &meta_queue_process;
   zebra->ribq->spec.errorfunc = NULL;
   /* XXX: TODO: These should be runtime configurable via vty */
   zebra->ribq->spec.max_retries = 3;
   zebra->ribq->spec.hold = rib_process_hold_time;
   
+  if (!(zebra->mq = meta_queue_new ()))
+  {
+    zlog_err ("%s: could not initialise meta queue!", __func__);
+    return;
+  }
   return;
 }
 
@@ -1663,11 +1773,7 @@ void rib_lookup_and_pushup (struct prefix_ipv4 * p)
     }
   }
   if (changed)
-  {
-    work_queue_aim_head (zebrad.ribq, 1);
     rib_queue_add (&zebrad, rn);
-    work_queue_aim_head (zebrad.ribq, 0);
-  }
 }
 
 int
index 5e2237764088c6294561f5c0e52a65b43baae60a..87a33a4550efb422d295130f20853b800b1955a2 100644 (file)
@@ -80,6 +80,7 @@ struct zebra_t
 
   /* rib work queue */
   struct work_queue *ribq;
+  struct meta_queue *mq;
 };
 
 /* Count prefix size from mask length */