]> git.puffer.fish Git - mirror/frr.git/commitdiff
zebra: fix write task collision
authorQuentin Young <qlyoung@cumulusnetworks.com>
Thu, 26 Apr 2018 04:06:15 +0000 (00:06 -0400)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Tue, 29 May 2018 19:06:16 +0000 (19:06 +0000)
Only one I/O task can be scheduled per file descriptor. Having two
separate tasks for buffer filling and buffer flushing was breaking that
invariant and causing messages to never be written.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
zebra/zserv.c
zebra/zserv.h

index 6ee5ea651ebaad48635fee2e12630fc3f5979ab8..740926e1afa853792851c654e92752eab44f6475 100644 (file)
@@ -76,8 +76,6 @@ enum zserv_client_event {
        ZSERV_CLIENT_READ,
        /* Schedule a buffer write */
        ZSERV_CLIENT_WRITE,
-       /* Schedule a buffer flush */
-       ZSERV_CLIENT_FLUSH_DATA,
 };
 
 /*
@@ -170,54 +168,50 @@ static void zserv_client_close(struct zserv *client)
 {
        THREAD_OFF(client->t_read);
        THREAD_OFF(client->t_write);
-       THREAD_OFF(client->t_flush);
        zserv_event(client, ZSERV_HANDLE_CLOSE);
 }
 
-static int zserv_flush_data(struct thread *thread)
-{
-       struct zserv *client = THREAD_ARG(thread);
-
-       switch (buffer_flush_available(client->wb, client->sock)) {
-       case BUFFER_ERROR:
-               zlog_warn(
-                       "%s: buffer_flush_available failed on zserv client fd %d, closing",
-                       __func__, client->sock);
-               zserv_client_close(client);
-               client = NULL;
-               break;
-       case BUFFER_PENDING:
-               zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
-               break;
-       case BUFFER_EMPTY:
-               break;
-       }
-
-       if (client)
-               client->last_write_time = monotime(NULL);
-       return 0;
-}
-
 /*
  * Write all pending messages to client socket.
  *
- * Any messages queued with zserv_send_message() before this function executes
- * will be pushed to the output buffer. The buffer will then take care of
- * writing chunks until it is empty.
+ * This function first attempts to flush any buffered data. If unsuccessful,
+ * the function reschedules itself and returns. If successful, it pops all
+ * available messages from the output queue and continues to write data
+ * directly to the socket until the socket would block. If the socket never
+ * blocks and all data is written, the function returns without rescheduling
+ * itself. If the socket ends up throwing EWOULDBLOCK, the remaining data is
+ * buffered and the function reschedules itself.
  *
- * This function does not reschedule itself. As far as it is concerned it
- * always writes all data. This saves us a mutex hit in thread_add_event at the
- * theoretical expense of buffer memory usage. In practice this should never be
- * an issue.
+ * The utility of the buffer is that it allows us to vastly reduce lock
+ * contention by allowing us to pop *all* messages off the output queue at once
+ * instead of locking and unlocking each time we want to pop a single message
+ * off the queue. The same thing could arguably be accomplished faster by
+ * allowing the main thread to write directly into the buffer instead of
+ * enqueuing packets onto an intermediary queue, but the intermediary queue
+ * allows us to expose information about input and output queues to the user in
+ * terms of number of packets rather than size of data.
  */
 static int zserv_write(struct thread *thread)
 {
        struct zserv *client = THREAD_ARG(thread);
        struct stream *msg;
        uint32_t wcmd;
-       int writerv = BUFFER_EMPTY;
-       struct stream_fifo *cache = stream_fifo_new();
-       bool ok = true;
+       int writerv;
+       struct stream_fifo *cache;
+
+       /* If we have any data pending, try to flush it first */
+       switch (buffer_flush_available(client->wb, client->sock)) {
+       case BUFFER_ERROR:
+               goto zwrite_fail;
+       case BUFFER_PENDING:
+               client->last_write_time = monotime(NULL);
+               zserv_client_event(client, ZSERV_CLIENT_WRITE);
+               return 0;
+       case BUFFER_EMPTY:
+               break;
+       }
+
+       cache = stream_fifo_new();
 
        pthread_mutex_lock(&client->obuf_mtx);
        {
@@ -227,7 +221,7 @@ static int zserv_write(struct thread *thread)
        }
        pthread_mutex_unlock(&client->obuf_mtx);
 
-       while (stream_fifo_head(cache) && ok) {
+       while (stream_fifo_head(cache)) {
                msg = stream_fifo_pop(cache);
                stream_set_getp(msg, 0);
 
@@ -237,15 +231,9 @@ static int zserv_write(struct thread *thread)
 
                switch (writerv) {
                case BUFFER_ERROR:
-                       zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
-                                 __func__, zebra_route_string(client->proto),
-                                 client->sock);
-                       zlog_warn("%s: closing connection to %s", __func__,
-                                 zebra_route_string(client->proto));
-                       zserv_client_close(client);
-                       ok = false;
-                       break;
-               /* continue writing */
+                       stream_free(msg);
+                       stream_fifo_free(cache);
+                       goto zwrite_fail;
                case BUFFER_PENDING:
                case BUFFER_EMPTY:
                        break;
@@ -254,8 +242,8 @@ static int zserv_write(struct thread *thread)
                stream_free(msg);
        }
 
-       if (ok && writerv == BUFFER_PENDING)
-               zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
+       if (!buffer_empty(client->wb))
+               zserv_client_event(client, ZSERV_CLIENT_WRITE);
 
        stream_fifo_free(cache);
 
@@ -266,6 +254,12 @@ static int zserv_write(struct thread *thread)
                              (uint32_t)monotime(NULL), memory_order_relaxed);
 
        return 0;
+
+zwrite_fail:
+       zlog_warn("%s: could not write to %s [fd = %d], closing.", __func__,
+                 zebra_route_string(client->proto), client->sock);
+       zserv_client_close(client);
+       return 0;
 }
 
 /*
@@ -449,10 +443,6 @@ static void zserv_client_event(struct zserv *client,
                thread_add_write(client->pthread->master, zserv_write, client,
                                 client->sock, &client->t_write);
                break;
-       case ZSERV_CLIENT_FLUSH_DATA:
-               thread_add_write(client->pthread->master, zserv_flush_data,
-                                client, client->sock, &client->t_flush);
-               break;
        }
 }
 
@@ -622,7 +612,6 @@ static int zserv_handle_client_close(struct thread *thread)
         */
        assert(!client->t_read);
        assert(!client->t_write);
-       assert(!client->t_flush);
 
        /* synchronously stop thread */
        frr_pthread_stop(client->pthread, NULL);
index fc338d89e73b3e8005271ad978973271bd4279ca..a1b55bf8ebe0cce6d3f53b8ac167af6d57f0bc39 100644 (file)
@@ -72,7 +72,6 @@ struct zserv {
        /* Threads for read/write. */
        struct thread *t_read;
        struct thread *t_write;
-       struct thread *t_flush;
 
        /* default routing table this client munges */
        int rtm_table;