]> git.puffer.fish Git - matthieu/frr.git/commitdiff
zebra: use atomic operations in FPM
authorRafael Zalamena <rzalamena@opensourcerouting.org>
Tue, 17 Dec 2019 13:03:54 +0000 (10:03 -0300)
committerRafael Zalamena <rzalamena@opensourcerouting.org>
Tue, 14 Apr 2020 16:45:39 +0000 (13:45 -0300)
FPM has a thread to encode and enqueue output buffer that might compete
with zebra RIB/RMAC walk on startup, so lets use atomic operations to
make sure we are not getting statistic/counters wrong.

Signed-off-by: Rafael Zalamena <rzalamena@opensourcerouting.org>
zebra/dplane_fpm_nl.c

index 5fada3b352e4b76ccd936b283609ae5d816cc272..284db1af743c378ce6ed7f6fd2174edf5ee3d78c 100644 (file)
@@ -102,33 +102,33 @@ struct fpm_nl_ctx {
        /* Statistic counters. */
        struct {
                /* Amount of bytes read into ibuf. */
-               uint64_t bytes_read;
+               _Atomic uint64_t bytes_read;
                /* Amount of bytes written from obuf. */
-               uint64_t bytes_sent;
+               _Atomic uint64_t bytes_sent;
                /* Output buffer current usage. */
-               uint64_t obuf_bytes;
+               _Atomic uint64_t obuf_bytes;
                /* Output buffer peak usage. */
-               uint64_t obuf_peak;
+               _Atomic uint64_t obuf_peak;
 
                /* Amount of connection closes. */
-               uint64_t connection_closes;
+               _Atomic uint64_t connection_closes;
                /* Amount of connection errors. */
-               uint64_t connection_errors;
+               _Atomic uint64_t connection_errors;
 
                /* Amount of user configurations: FNE_RECONNECT. */
-               uint64_t user_configures;
+               _Atomic uint64_t user_configures;
                /* Amount of user disable requests: FNE_DISABLE. */
-               uint64_t user_disables;
+               _Atomic uint64_t user_disables;
 
                /* Amount of data plane context processed. */
-               uint64_t dplane_contexts;
+               _Atomic uint64_t dplane_contexts;
                /* Amount of data plane contexts enqueued. */
-               uint64_t ctxqueue_len;
+               _Atomic uint64_t ctxqueue_len;
                /* Peak amount of data plane contexts enqueued. */
-               uint64_t ctxqueue_len_peak;
+               _Atomic uint64_t ctxqueue_len_peak;
 
                /* Amount of buffer full events. */
-               uint64_t buffer_full;
+               _Atomic uint64_t buffer_full;
        } counters;
 } * gfnc;
 
@@ -415,7 +415,7 @@ static int fpm_read(struct thread *t)
        rv = stream_read_try(fnc->ibuf, fnc->socket,
                             STREAM_WRITEABLE(fnc->ibuf));
        if (rv == 0) {
-               fnc->counters.connection_closes++;
+               atomic_fetch_add(&fnc->counters.connection_closes, 1);
                zlog_debug("%s: connection closed", __func__);
                fpm_reconnect(fnc);
                return 0;
@@ -425,7 +425,7 @@ static int fpm_read(struct thread *t)
                    || errno == EINTR)
                        return 0;
 
-               fnc->counters.connection_errors++;
+               atomic_fetch_add(&fnc->counters.connection_errors, 1);
                zlog_debug("%s: connection failure: %s", __func__,
                           strerror(errno));
                fpm_reconnect(fnc);
@@ -434,7 +434,7 @@ static int fpm_read(struct thread *t)
        stream_reset(fnc->ibuf);
 
        /* Account all bytes read. */
-       fnc->counters.bytes_read += rv;
+       atomic_fetch_add(&fnc->counters.bytes_read, rv);
 
        thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
                        &fnc->t_read);
@@ -464,7 +464,7 @@ static int fpm_write(struct thread *t)
                                zlog_debug("%s: SO_ERROR failed: %s", __func__,
                                           strerror(status));
 
-                       fnc->counters.connection_errors++;
+                       atomic_fetch_add(&fnc->counters.connection_errors, 1);
 
                        fpm_reconnect(fnc);
                        return 0;
@@ -493,7 +493,7 @@ static int fpm_write(struct thread *t)
                        stream_get_getp(fnc->obuf);
                bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
                if (bwritten == 0) {
-                       fnc->counters.connection_closes++;
+                       atomic_fetch_add(&fnc->counters.connection_closes, 1);
                        zlog_debug("%s: connection closed", __func__);
                        break;
                }
@@ -505,7 +505,7 @@ static int fpm_write(struct thread *t)
                        if (errno == EAGAIN || errno == EWOULDBLOCK)
                                break;
 
-                       fnc->counters.connection_errors++;
+                       atomic_fetch_add(&fnc->counters.connection_errors, 1);
                        zlog_debug("%s: connection failure: %s", __func__,
                                   strerror(errno));
                        fpm_reconnect(fnc);
@@ -513,10 +513,10 @@ static int fpm_write(struct thread *t)
                }
 
                /* Account all bytes sent. */
-               fnc->counters.bytes_sent += bwritten;
+               atomic_fetch_add(&fnc->counters.bytes_sent, bwritten);
 
                /* Account number of bytes free. */
-               fnc->counters.obuf_bytes -= bwritten;
+               atomic_fetch_sub(&fnc->counters.obuf_bytes, bwritten);
 
                stream_forward_getp(fnc->obuf, (size_t)bwritten);
        }
@@ -565,7 +565,7 @@ static int fpm_connect(struct thread *t)
 
        rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
        if (rv == -1 && errno != EINPROGRESS) {
-               fnc->counters.connection_errors++;
+               atomic_fetch_add(&fnc->counters.connection_errors, 1);
                close(sock);
                zlog_warn("%s: fpm connection failed: %s", __func__,
                          strerror(errno));
@@ -603,6 +603,7 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
        uint8_t nl_buf[NL_PKT_BUF_SIZE];
        size_t nl_buf_len;
        ssize_t rv;
+       uint64_t obytes, obytes_peak;
 
        nl_buf_len = 0;
 
@@ -689,7 +690,7 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
 
        /* Check if we have enough buffer space. */
        if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
-               fnc->counters.buffer_full++;
+               atomic_fetch_add(&fnc->counters.buffer_full, 1);
                zlog_debug("%s: buffer full: wants to write %lu but has %ld",
                           __func__, nl_buf_len + FPM_HEADER_SIZE,
                           STREAM_WRITEABLE(fnc->obuf));
@@ -709,9 +710,14 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
        stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
 
        /* Account number of bytes waiting to be written. */
-       fnc->counters.obuf_bytes += nl_buf_len + FPM_HEADER_SIZE;
-       if (fnc->counters.obuf_peak < fnc->counters.obuf_bytes)
-               fnc->counters.obuf_peak = fnc->counters.obuf_bytes;
+       atomic_fetch_add(&fnc->counters.obuf_bytes,
+                        nl_buf_len + FPM_HEADER_SIZE);
+       obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
+                                     memory_order_relaxed);
+       obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
+                                          memory_order_relaxed);
+       if (obytes_peak < obytes)
+               atomic_store(&fnc->counters.obuf_peak, obytes);
 
        /* Tell the thread to start writing. */
        thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
@@ -908,15 +914,15 @@ static int fpm_process_queue(struct thread *t)
                fpm_nl_enqueue(fnc, ctx);
 
                /* Account the processed entries. */
-               fnc->counters.dplane_contexts++;
-               fnc->counters.ctxqueue_len--;
+               atomic_fetch_add(&fnc->counters.dplane_contexts, 1);
+               atomic_fetch_sub(&fnc->counters.ctxqueue_len, 1);
 
                dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
                dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
        }
 
        /* Check for more items in the queue. */
-       if (fnc->counters.ctxqueue_len)
+       if (atomic_load(&fnc->counters.ctxqueue_len) > 0)
                thread_add_timer(fnc->fthread->master, fpm_process_queue,
                                 fnc, 0, &fnc->t_dequeue);
 
@@ -935,7 +941,7 @@ static int fpm_process_event(struct thread *t)
        case FNE_DISABLE:
                zlog_debug("%s: manual FPM disable event", __func__);
                fnc->disabled = true;
-               fnc->counters.user_disables++;
+               atomic_fetch_add(&fnc->counters.user_disables, 1);
 
                /* Call reconnect to disable timers and clean up context. */
                fpm_reconnect(fnc);
@@ -944,7 +950,7 @@ static int fpm_process_event(struct thread *t)
        case FNE_RECONNECT:
                zlog_debug("%s: manual FPM reconnect event", __func__);
                fnc->disabled = false;
-               fnc->counters.user_configures++;
+               atomic_fetch_add(&fnc->counters.user_configures, 1);
                fpm_reconnect(fnc);
                break;
 
@@ -1000,6 +1006,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
        struct zebra_dplane_ctx *ctx;
        struct fpm_nl_ctx *fnc;
        int counter, limit;
+       uint64_t cur_queue, peak_queue;
 
        fnc = dplane_provider_get_data(prov);
        limit = dplane_provider_get_work_limit(prov);
@@ -1017,11 +1024,13 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
                        dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
 
                        /* Account the number of contexts. */
-                       fnc->counters.ctxqueue_len++;
-                       if (fnc->counters.ctxqueue_len_peak <
-                               fnc->counters.ctxqueue_len)
-                               fnc->counters.ctxqueue_len_peak =
-                                       fnc->counters.ctxqueue_len;
+                       atomic_fetch_add(&fnc->counters.ctxqueue_len, 1);
+                       cur_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len,
+                                                        memory_order_relaxed);
+                       peak_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len_peak,
+                                                         memory_order_relaxed);
+                       if (peak_queue < cur_queue)
+                               atomic_store(&fnc->counters.ctxqueue_len_peak, peak_queue);
                        continue;
                }
 
@@ -1029,7 +1038,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
                dplane_provider_enqueue_out_ctx(prov, ctx);
        }
 
-       if (fnc->counters.ctxqueue_len)
+       if (atomic_load(&fnc->counters.ctxqueue_len) > 0)
                thread_add_timer(fnc->fthread->master, fpm_process_queue,
                                 fnc, 0, &fnc->t_dequeue);