From edfeff4251787e2239e1c30e9721aa402d34825d Mon Sep 17 00:00:00 2001 From: Rafael Zalamena Date: Tue, 17 Dec 2019 10:03:54 -0300 Subject: [PATCH] zebra: use atomic operations in FPM FPM has a thread to encode and enqueue output buffer that might compete with zebra RIB/RMAC walk on startup, so lets use atomic operations to make sure we are not getting statistic/counters wrong. Signed-off-by: Rafael Zalamena --- zebra/dplane_fpm_nl.c | 81 ++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/zebra/dplane_fpm_nl.c b/zebra/dplane_fpm_nl.c index 5fada3b352..284db1af74 100644 --- a/zebra/dplane_fpm_nl.c +++ b/zebra/dplane_fpm_nl.c @@ -102,33 +102,33 @@ struct fpm_nl_ctx { /* Statistic counters. */ struct { /* Amount of bytes read into ibuf. */ - uint64_t bytes_read; + _Atomic uint64_t bytes_read; /* Amount of bytes written from obuf. */ - uint64_t bytes_sent; + _Atomic uint64_t bytes_sent; /* Output buffer current usage. */ - uint64_t obuf_bytes; + _Atomic uint64_t obuf_bytes; /* Output buffer peak usage. */ - uint64_t obuf_peak; + _Atomic uint64_t obuf_peak; /* Amount of connection closes. */ - uint64_t connection_closes; + _Atomic uint64_t connection_closes; /* Amount of connection errors. */ - uint64_t connection_errors; + _Atomic uint64_t connection_errors; /* Amount of user configurations: FNE_RECONNECT. */ - uint64_t user_configures; + _Atomic uint64_t user_configures; /* Amount of user disable requests: FNE_DISABLE. */ - uint64_t user_disables; + _Atomic uint64_t user_disables; /* Amount of data plane context processed. */ - uint64_t dplane_contexts; + _Atomic uint64_t dplane_contexts; /* Amount of data plane contexts enqueued. */ - uint64_t ctxqueue_len; + _Atomic uint64_t ctxqueue_len; /* Peak amount of data plane contexts enqueued. */ - uint64_t ctxqueue_len_peak; + _Atomic uint64_t ctxqueue_len_peak; /* Amount of buffer full events. */ - uint64_t buffer_full; + _Atomic uint64_t buffer_full; } counters; } * gfnc; @@ -415,7 +415,7 @@ static int fpm_read(struct thread *t) rv = stream_read_try(fnc->ibuf, fnc->socket, STREAM_WRITEABLE(fnc->ibuf)); if (rv == 0) { - fnc->counters.connection_closes++; + atomic_fetch_add(&fnc->counters.connection_closes, 1); zlog_debug("%s: connection closed", __func__); fpm_reconnect(fnc); return 0; @@ -425,7 +425,7 @@ static int fpm_read(struct thread *t) || errno == EINTR) return 0; - fnc->counters.connection_errors++; + atomic_fetch_add(&fnc->counters.connection_errors, 1); zlog_debug("%s: connection failure: %s", __func__, strerror(errno)); fpm_reconnect(fnc); @@ -434,7 +434,7 @@ static int fpm_read(struct thread *t) stream_reset(fnc->ibuf); /* Account all bytes read. */ - fnc->counters.bytes_read += rv; + atomic_fetch_add(&fnc->counters.bytes_read, rv); thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket, &fnc->t_read); @@ -464,7 +464,7 @@ static int fpm_write(struct thread *t) zlog_debug("%s: SO_ERROR failed: %s", __func__, strerror(status)); - fnc->counters.connection_errors++; + atomic_fetch_add(&fnc->counters.connection_errors, 1); fpm_reconnect(fnc); return 0; @@ -493,7 +493,7 @@ static int fpm_write(struct thread *t) stream_get_getp(fnc->obuf); bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal); if (bwritten == 0) { - fnc->counters.connection_closes++; + atomic_fetch_add(&fnc->counters.connection_closes, 1); zlog_debug("%s: connection closed", __func__); break; } @@ -505,7 +505,7 @@ static int fpm_write(struct thread *t) if (errno == EAGAIN || errno == EWOULDBLOCK) break; - fnc->counters.connection_errors++; + atomic_fetch_add(&fnc->counters.connection_errors, 1); zlog_debug("%s: connection failure: %s", __func__, strerror(errno)); fpm_reconnect(fnc); @@ -513,10 +513,10 @@ static int fpm_write(struct thread *t) } /* Account all bytes sent. */ - fnc->counters.bytes_sent += bwritten; + atomic_fetch_add(&fnc->counters.bytes_sent, bwritten); /* Account number of bytes free. */ - fnc->counters.obuf_bytes -= bwritten; + atomic_fetch_sub(&fnc->counters.obuf_bytes, bwritten); stream_forward_getp(fnc->obuf, (size_t)bwritten); } @@ -565,7 +565,7 @@ static int fpm_connect(struct thread *t) rv = connect(sock, (struct sockaddr *)&fnc->addr, slen); if (rv == -1 && errno != EINPROGRESS) { - fnc->counters.connection_errors++; + atomic_fetch_add(&fnc->counters.connection_errors, 1); close(sock); zlog_warn("%s: fpm connection failed: %s", __func__, strerror(errno)); @@ -603,6 +603,7 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx) uint8_t nl_buf[NL_PKT_BUF_SIZE]; size_t nl_buf_len; ssize_t rv; + uint64_t obytes, obytes_peak; nl_buf_len = 0; @@ -689,7 +690,7 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx) /* Check if we have enough buffer space. */ if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) { - fnc->counters.buffer_full++; + atomic_fetch_add(&fnc->counters.buffer_full, 1); zlog_debug("%s: buffer full: wants to write %lu but has %ld", __func__, nl_buf_len + FPM_HEADER_SIZE, STREAM_WRITEABLE(fnc->obuf)); @@ -709,9 +710,14 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx) stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len); /* Account number of bytes waiting to be written. */ - fnc->counters.obuf_bytes += nl_buf_len + FPM_HEADER_SIZE; - if (fnc->counters.obuf_peak < fnc->counters.obuf_bytes) - fnc->counters.obuf_peak = fnc->counters.obuf_bytes; + atomic_fetch_add(&fnc->counters.obuf_bytes, + nl_buf_len + FPM_HEADER_SIZE); + obytes = atomic_load_explicit(&fnc->counters.obuf_bytes, + memory_order_relaxed); + obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak, + memory_order_relaxed); + if (obytes_peak < obytes) + atomic_store(&fnc->counters.obuf_peak, obytes); /* Tell the thread to start writing. */ thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket, @@ -908,15 +914,15 @@ static int fpm_process_queue(struct thread *t) fpm_nl_enqueue(fnc, ctx); /* Account the processed entries. */ - fnc->counters.dplane_contexts++; - fnc->counters.ctxqueue_len--; + atomic_fetch_add(&fnc->counters.dplane_contexts, 1); + atomic_fetch_sub(&fnc->counters.ctxqueue_len, 1); dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS); dplane_provider_enqueue_out_ctx(fnc->prov, ctx); } /* Check for more items in the queue. */ - if (fnc->counters.ctxqueue_len) + if (atomic_load(&fnc->counters.ctxqueue_len) > 0) thread_add_timer(fnc->fthread->master, fpm_process_queue, fnc, 0, &fnc->t_dequeue); @@ -935,7 +941,7 @@ static int fpm_process_event(struct thread *t) case FNE_DISABLE: zlog_debug("%s: manual FPM disable event", __func__); fnc->disabled = true; - fnc->counters.user_disables++; + atomic_fetch_add(&fnc->counters.user_disables, 1); /* Call reconnect to disable timers and clean up context. */ fpm_reconnect(fnc); @@ -944,7 +950,7 @@ static int fpm_process_event(struct thread *t) case FNE_RECONNECT: zlog_debug("%s: manual FPM reconnect event", __func__); fnc->disabled = false; - fnc->counters.user_configures++; + atomic_fetch_add(&fnc->counters.user_configures, 1); fpm_reconnect(fnc); break; @@ -1000,6 +1006,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) struct zebra_dplane_ctx *ctx; struct fpm_nl_ctx *fnc; int counter, limit; + uint64_t cur_queue, peak_queue; fnc = dplane_provider_get_data(prov); limit = dplane_provider_get_work_limit(prov); @@ -1017,11 +1024,13 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx); /* Account the number of contexts. */ - fnc->counters.ctxqueue_len++; - if (fnc->counters.ctxqueue_len_peak < - fnc->counters.ctxqueue_len) - fnc->counters.ctxqueue_len_peak = - fnc->counters.ctxqueue_len; + atomic_fetch_add(&fnc->counters.ctxqueue_len, 1); + cur_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len, + memory_order_relaxed); + peak_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len_peak, + memory_order_relaxed); + if (peak_queue < cur_queue) + atomic_store(&fnc->counters.ctxqueue_len_peak, peak_queue); continue; } @@ -1029,7 +1038,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) dplane_provider_enqueue_out_ctx(prov, ctx); } - if (fnc->counters.ctxqueue_len) + if (atomic_load(&fnc->counters.ctxqueue_len) > 0) thread_add_timer(fnc->fthread->master, fpm_process_queue, fnc, 0, &fnc->t_dequeue); -- 2.39.5