]> git.puffer.fish Git - matthieu/frr.git/commitdiff
zebra: queue data plane context for FPM
authorRafael Zalamena <rzalamena@opensourcerouting.org>
Thu, 12 Dec 2019 18:04:23 +0000 (15:04 -0300)
committerRafael Zalamena <rzalamena@opensourcerouting.org>
Tue, 14 Apr 2020 16:45:39 +0000 (13:45 -0300)
Enqueue all contexts inside FPM to avoid losing updates and to move all
processing to the FPM thread.

This helps in situations with huge amount of routes (e.g. BGP peer
flapping with a million routes).

Signed-off-by: Rafael Zalamena <rzalamena@opensourcerouting.org>
zebra/dplane_fpm_nl.c

index 77326f8d7c00eff92c0d44df958cce67434af1e6..5fada3b352e4b76ccd936b283609ae5d816cc272 100644 (file)
@@ -76,12 +76,22 @@ struct fpm_nl_ctx {
        struct stream *obuf;
        pthread_mutex_t obuf_mutex;
 
+       /*
+        * data plane context queue:
+        * When a FPM server connection becomes a bottleneck, we must keep the
+        * data plane contexts until we get a chance to process them.
+        */
+       struct dplane_ctx_q ctxqueue;
+       pthread_mutex_t ctxqueue_mutex;
+
        /* data plane events. */
+       struct zebra_dplane_provider *prov;
        struct frr_pthread *fthread;
        struct thread *t_connect;
        struct thread *t_read;
        struct thread *t_write;
        struct thread *t_event;
+       struct thread *t_dequeue;
 
        /* zebra events. */
        struct thread *t_ribreset;
@@ -112,6 +122,10 @@ struct fpm_nl_ctx {
 
                /* Amount of data plane context processed. */
                uint64_t dplane_contexts;
+               /* Amount of data plane contexts enqueued. */
+               uint64_t ctxqueue_len;
+               /* Peak amount of data plane contexts enqueued. */
+               uint64_t ctxqueue_len_peak;
 
                /* Amount of buffer full events. */
                uint64_t buffer_full;
@@ -266,6 +280,10 @@ DEFUN(fpm_show_counters, fpm_show_counters_cmd,
        SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
        SHOW_COUNTER("Data plane items processed",
                     gfnc->counters.dplane_contexts);
+       SHOW_COUNTER("Data plane items enqueued",
+                    gfnc->counters.ctxqueue_len);
+       SHOW_COUNTER("Data plane items queue peak",
+                    gfnc->counters.ctxqueue_len_peak);
        SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
        SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
        SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
@@ -292,6 +310,10 @@ DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
        json_object_int_add(jo, "connection-closes", gfnc->counters.connection_closes);
        json_object_int_add(jo, "connection-errors", gfnc->counters.connection_errors);
        json_object_int_add(jo, "data-plane-contexts", gfnc->counters.dplane_contexts);
+       json_object_int_add(jo, "data-plane-contexts-queue",
+                           gfnc->counters.ctxqueue_len);
+       json_object_int_add(jo, "data-plane-contexts-queue-peak",
+                           gfnc->counters.ctxqueue_len_peak);
        json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
        json_object_int_add(jo, "user-configures", gfnc->counters.user_configures);
        json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
@@ -866,6 +888,41 @@ static int fpm_rmac_reset(struct thread *t)
        return 0;
 }
 
+static int fpm_process_queue(struct thread *t)
+{
+       struct fpm_nl_ctx *fnc = THREAD_ARG(t);
+       struct zebra_dplane_ctx *ctx;
+
+       frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
+
+       while (true) {
+               /* No space available yet. */
+               if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
+                       break;
+
+               /* Dequeue next item or quit processing. */
+               ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
+               if (ctx == NULL)
+                       break;
+
+               fpm_nl_enqueue(fnc, ctx);
+
+               /* Account the processed entries. */
+               fnc->counters.dplane_contexts++;
+               fnc->counters.ctxqueue_len--;
+
+               dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
+               dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
+       }
+
+       /* Check for more items in the queue. */
+       if (fnc->counters.ctxqueue_len)
+               thread_add_timer(fnc->fthread->master, fpm_process_queue,
+                                fnc, 0, &fnc->t_dequeue);
+
+       return 0;
+}
+
 /**
  * Handles external (e.g. CLI, data plane or others) events.
  */
@@ -919,6 +976,9 @@ static int fpm_nl_start(struct zebra_dplane_provider *prov)
        pthread_mutex_init(&fnc->obuf_mutex, NULL);
        fnc->socket = -1;
        fnc->disabled = true;
+       fnc->prov = prov;
+       TAILQ_INIT(&fnc->ctxqueue);
+       pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
 
        return 0;
 }
@@ -953,15 +1013,26 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
                 * anyway.
                 */
                if (fnc->socket != -1 && fnc->connecting == false) {
-                       fpm_nl_enqueue(fnc, ctx);
-
-                       fnc->counters.dplane_contexts++;
+                       frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
+                       dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
+
+                       /* Account the number of contexts. */
+                       fnc->counters.ctxqueue_len++;
+                       if (fnc->counters.ctxqueue_len_peak <
+                               fnc->counters.ctxqueue_len)
+                               fnc->counters.ctxqueue_len_peak =
+                                       fnc->counters.ctxqueue_len;
+                       continue;
                }
 
                dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
                dplane_provider_enqueue_out_ctx(prov, ctx);
        }
 
+       if (fnc->counters.ctxqueue_len)
+               thread_add_timer(fnc->fthread->master, fpm_process_queue,
+                                fnc, 0, &fnc->t_dequeue);
+
        return 0;
 }