uint64_t bytes_read;
/* Amount of bytes written from obuf. */
uint64_t bytes_sent;
+ /* Output buffer current usage. */
+ uint64_t obuf_bytes;
+ /* Output buffer peak usage. */
+ uint64_t obuf_peak;
/* Amount of connection closes. */
uint64_t connection_closes;
static int fpm_rmac_send(struct thread *t);
static int fpm_rmac_reset(struct thread *t);
+/*
+ * Helper functions.
+ */
+
+/**
+ * Reorganizes the data on the buffer so it can fit more data.
+ *
+ * @param s stream pointer.
+ */
+static void stream_pulldown(struct stream *s)
+{
+ size_t rlen = STREAM_READABLE(s);
+
+ /* No more data, so just move the pointers. */
+ if (rlen == 0) {
+ stream_reset(s);
+ return;
+ }
+
+ /* Move the available data to the beginning. */
+ memmove(s->data, &s->data[s->getp], rlen);
+ s->getp = 0;
+ s->endp = rlen;
+}
+
/*
* CLI.
*/
SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
+ SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
+ SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
SHOW_COUNTER("Data plane items processed",
jo = json_object_new_object();
json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
+ json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
+ json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
json_object_int_add(jo, "connection-closes", gfnc->counters.connection_closes);
json_object_int_add(jo, "connection-errors", gfnc->counters.connection_errors);
json_object_int_add(jo, "data-plane-contexts", gfnc->counters.dplane_contexts);
break;
}
if (bwritten == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK
- || errno == EINTR)
+ /* Attempt to continue if blocked by a signal. */
+ if (errno == EINTR)
+ continue;
+ /* Receiver is probably slow, lets give it some time. */
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
fnc->counters.connection_errors++;
/* Account all bytes sent. */
fnc->counters.bytes_sent += bwritten;
+ /* Account number of bytes free. */
+ fnc->counters.obuf_bytes -= bwritten;
+
stream_forward_getp(fnc->obuf, (size_t)bwritten);
}
/* Stream is not empty yet, we must schedule more writes. */
if (STREAM_READABLE(fnc->obuf)) {
+ stream_pulldown(fnc->obuf);
thread_add_write(fnc->fthread->master, fpm_write, fnc,
fnc->socket, &fnc->t_write);
return 0;
/* Write current data. */
stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
+ /* Account number of bytes waiting to be written. */
+ fnc->counters.obuf_bytes += nl_buf_len + FPM_HEADER_SIZE;
+ if (fnc->counters.obuf_peak < fnc->counters.obuf_bytes)
+ fnc->counters.obuf_peak = fnc->counters.obuf_bytes;
+
/* Tell the thread to start writing. */
thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
&fnc->t_write);