/* Statistic counters. */
struct {
/* Amount of bytes read into ibuf. */
- uint64_t bytes_read;
+ _Atomic uint64_t bytes_read;
/* Amount of bytes written from obuf. */
- uint64_t bytes_sent;
+ _Atomic uint64_t bytes_sent;
/* Output buffer current usage. */
- uint64_t obuf_bytes;
+ _Atomic uint64_t obuf_bytes;
/* Output buffer peak usage. */
- uint64_t obuf_peak;
+ _Atomic uint64_t obuf_peak;
/* Amount of connection closes. */
- uint64_t connection_closes;
+ _Atomic uint64_t connection_closes;
/* Amount of connection errors. */
- uint64_t connection_errors;
+ _Atomic uint64_t connection_errors;
/* Amount of user configurations: FNE_RECONNECT. */
- uint64_t user_configures;
+ _Atomic uint64_t user_configures;
/* Amount of user disable requests: FNE_DISABLE. */
- uint64_t user_disables;
+ _Atomic uint64_t user_disables;
/* Amount of data plane context processed. */
- uint64_t dplane_contexts;
+ _Atomic uint64_t dplane_contexts;
/* Amount of data plane contexts enqueued. */
- uint64_t ctxqueue_len;
+ _Atomic uint64_t ctxqueue_len;
/* Peak amount of data plane contexts enqueued. */
- uint64_t ctxqueue_len_peak;
+ _Atomic uint64_t ctxqueue_len_peak;
/* Amount of buffer full events. */
- uint64_t buffer_full;
+ _Atomic uint64_t buffer_full;
} counters;
} * gfnc;
rv = stream_read_try(fnc->ibuf, fnc->socket,
STREAM_WRITEABLE(fnc->ibuf));
if (rv == 0) {
- fnc->counters.connection_closes++;
+ atomic_fetch_add(&fnc->counters.connection_closes, 1);
zlog_debug("%s: connection closed", __func__);
fpm_reconnect(fnc);
return 0;
|| errno == EINTR)
return 0;
- fnc->counters.connection_errors++;
+ atomic_fetch_add(&fnc->counters.connection_errors, 1);
zlog_debug("%s: connection failure: %s", __func__,
strerror(errno));
fpm_reconnect(fnc);
stream_reset(fnc->ibuf);
/* Account all bytes read. */
- fnc->counters.bytes_read += rv;
+ atomic_fetch_add(&fnc->counters.bytes_read, rv);
thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
&fnc->t_read);
zlog_debug("%s: SO_ERROR failed: %s", __func__,
strerror(status));
- fnc->counters.connection_errors++;
+ atomic_fetch_add(&fnc->counters.connection_errors, 1);
fpm_reconnect(fnc);
return 0;
stream_get_getp(fnc->obuf);
bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
if (bwritten == 0) {
- fnc->counters.connection_closes++;
+ atomic_fetch_add(&fnc->counters.connection_closes, 1);
zlog_debug("%s: connection closed", __func__);
break;
}
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
- fnc->counters.connection_errors++;
+ atomic_fetch_add(&fnc->counters.connection_errors, 1);
zlog_debug("%s: connection failure: %s", __func__,
strerror(errno));
fpm_reconnect(fnc);
}
/* Account all bytes sent. */
- fnc->counters.bytes_sent += bwritten;
+ atomic_fetch_add(&fnc->counters.bytes_sent, bwritten);
/* Account number of bytes free. */
- fnc->counters.obuf_bytes -= bwritten;
+ atomic_fetch_sub(&fnc->counters.obuf_bytes, bwritten);
stream_forward_getp(fnc->obuf, (size_t)bwritten);
}
rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
if (rv == -1 && errno != EINPROGRESS) {
- fnc->counters.connection_errors++;
+ atomic_fetch_add(&fnc->counters.connection_errors, 1);
close(sock);
zlog_warn("%s: fpm connection failed: %s", __func__,
strerror(errno));
uint8_t nl_buf[NL_PKT_BUF_SIZE];
size_t nl_buf_len;
ssize_t rv;
+ uint64_t obytes, obytes_peak;
nl_buf_len = 0;
/* Check if we have enough buffer space. */
if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
- fnc->counters.buffer_full++;
+ atomic_fetch_add(&fnc->counters.buffer_full, 1);
zlog_debug("%s: buffer full: wants to write %lu but has %ld",
__func__, nl_buf_len + FPM_HEADER_SIZE,
STREAM_WRITEABLE(fnc->obuf));
stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
/* Account number of bytes waiting to be written. */
- fnc->counters.obuf_bytes += nl_buf_len + FPM_HEADER_SIZE;
- if (fnc->counters.obuf_peak < fnc->counters.obuf_bytes)
- fnc->counters.obuf_peak = fnc->counters.obuf_bytes;
+ atomic_fetch_add(&fnc->counters.obuf_bytes,
+ nl_buf_len + FPM_HEADER_SIZE);
+ obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
+ memory_order_relaxed);
+ obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
+ memory_order_relaxed);
+ if (obytes_peak < obytes)
+ atomic_store(&fnc->counters.obuf_peak, obytes);
/* Tell the thread to start writing. */
thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
fpm_nl_enqueue(fnc, ctx);
/* Account the processed entries. */
- fnc->counters.dplane_contexts++;
- fnc->counters.ctxqueue_len--;
+ atomic_fetch_add(&fnc->counters.dplane_contexts, 1);
+ atomic_fetch_sub(&fnc->counters.ctxqueue_len, 1);
dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
}
/* Check for more items in the queue. */
- if (fnc->counters.ctxqueue_len)
+ if (atomic_load(&fnc->counters.ctxqueue_len) > 0)
thread_add_timer(fnc->fthread->master, fpm_process_queue,
fnc, 0, &fnc->t_dequeue);
case FNE_DISABLE:
zlog_debug("%s: manual FPM disable event", __func__);
fnc->disabled = true;
- fnc->counters.user_disables++;
+ atomic_fetch_add(&fnc->counters.user_disables, 1);
/* Call reconnect to disable timers and clean up context. */
fpm_reconnect(fnc);
case FNE_RECONNECT:
zlog_debug("%s: manual FPM reconnect event", __func__);
fnc->disabled = false;
- fnc->counters.user_configures++;
+ atomic_fetch_add(&fnc->counters.user_configures, 1);
fpm_reconnect(fnc);
break;
struct zebra_dplane_ctx *ctx;
struct fpm_nl_ctx *fnc;
int counter, limit;
+ uint64_t cur_queue, peak_queue;
fnc = dplane_provider_get_data(prov);
limit = dplane_provider_get_work_limit(prov);
dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
/* Account the number of contexts. */
- fnc->counters.ctxqueue_len++;
- if (fnc->counters.ctxqueue_len_peak <
- fnc->counters.ctxqueue_len)
- fnc->counters.ctxqueue_len_peak =
- fnc->counters.ctxqueue_len;
+ atomic_fetch_add(&fnc->counters.ctxqueue_len, 1);
+ cur_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len,
+ memory_order_relaxed);
+ peak_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len_peak,
+ memory_order_relaxed);
+ if (peak_queue < cur_queue)
+ atomic_store(&fnc->counters.ctxqueue_len_peak, peak_queue);
continue;
}
dplane_provider_enqueue_out_ctx(prov, ctx);
}
- if (fnc->counters.ctxqueue_len)
+ if (atomic_load(&fnc->counters.ctxqueue_len) > 0)
thread_add_timer(fnc->fthread->master, fpm_process_queue,
fnc, 0, &fnc->t_dequeue);