struct stream *obuf;
pthread_mutex_t obuf_mutex;
+ /*
+ * data plane context queue:
+ * When a FPM server connection becomes a bottleneck, we must keep the
+ * data plane contexts until we get a chance to process them.
+ */
+ struct dplane_ctx_q ctxqueue;
+ pthread_mutex_t ctxqueue_mutex;
+
/* data plane events. */
+ struct zebra_dplane_provider *prov;
struct frr_pthread *fthread;
struct thread *t_connect;
struct thread *t_read;
struct thread *t_write;
struct thread *t_event;
+ struct thread *t_dequeue;
/* zebra events. */
struct thread *t_ribreset;
/* Amount of data plane context processed. */
uint64_t dplane_contexts;
+ /* Amount of data plane contexts enqueued. */
+ uint64_t ctxqueue_len;
+ /* Peak amount of data plane contexts enqueued. */
+ uint64_t ctxqueue_len_peak;
/* Amount of buffer full events. */
uint64_t buffer_full;
SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
SHOW_COUNTER("Data plane items processed",
gfnc->counters.dplane_contexts);
+ SHOW_COUNTER("Data plane items enqueued",
+ gfnc->counters.ctxqueue_len);
+ SHOW_COUNTER("Data plane items queue peak",
+ gfnc->counters.ctxqueue_len_peak);
SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
json_object_int_add(jo, "connection-closes", gfnc->counters.connection_closes);
json_object_int_add(jo, "connection-errors", gfnc->counters.connection_errors);
json_object_int_add(jo, "data-plane-contexts", gfnc->counters.dplane_contexts);
+ json_object_int_add(jo, "data-plane-contexts-queue",
+ gfnc->counters.ctxqueue_len);
+ json_object_int_add(jo, "data-plane-contexts-queue-peak",
+ gfnc->counters.ctxqueue_len_peak);
json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
json_object_int_add(jo, "user-configures", gfnc->counters.user_configures);
json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
return 0;
}
+static int fpm_process_queue(struct thread *t)
+{
+ struct fpm_nl_ctx *fnc = THREAD_ARG(t);
+ struct zebra_dplane_ctx *ctx;
+
+ frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
+
+ while (true) {
+ /* No space available yet. */
+ if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
+ break;
+
+ /* Dequeue next item or quit processing. */
+ ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
+ if (ctx == NULL)
+ break;
+
+ fpm_nl_enqueue(fnc, ctx);
+
+ /* Account the processed entries. */
+ fnc->counters.dplane_contexts++;
+ fnc->counters.ctxqueue_len--;
+
+ dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
+ dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
+ }
+
+ /* Check for more items in the queue. */
+ if (fnc->counters.ctxqueue_len)
+ thread_add_timer(fnc->fthread->master, fpm_process_queue,
+ fnc, 0, &fnc->t_dequeue);
+
+ return 0;
+}
+
/**
* Handles external (e.g. CLI, data plane or others) events.
*/
pthread_mutex_init(&fnc->obuf_mutex, NULL);
fnc->socket = -1;
fnc->disabled = true;
+ fnc->prov = prov;
+ TAILQ_INIT(&fnc->ctxqueue);
+ pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
return 0;
}
* anyway.
*/
if (fnc->socket != -1 && fnc->connecting == false) {
- fpm_nl_enqueue(fnc, ctx);
-
- fnc->counters.dplane_contexts++;
+ frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
+ dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
+
+ /* Account the number of contexts. */
+ fnc->counters.ctxqueue_len++;
+ if (fnc->counters.ctxqueue_len_peak <
+ fnc->counters.ctxqueue_len)
+ fnc->counters.ctxqueue_len_peak =
+ fnc->counters.ctxqueue_len;
+ continue;
}
dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
dplane_provider_enqueue_out_ctx(prov, ctx);
}
+ if (fnc->counters.ctxqueue_len)
+ thread_add_timer(fnc->fthread->master, fpm_process_queue,
+ fnc, 0, &fnc->t_dequeue);
+
return 0;
}