]> git.puffer.fish Git - mirror/frr.git/commitdiff
zebra: some more i/o optimizations
authorQuentin Young <qlyoung@cumulusnetworks.com>
Wed, 25 Apr 2018 22:45:21 +0000 (18:45 -0400)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Tue, 29 May 2018 19:06:16 +0000 (19:06 +0000)
* Separate flush task from write task, so we can continue adding to the
  write buffer while it's waiting to flush
* Handle write errors sooner rather than later
* Only schedule a process job if we have packets to process
* Tweak zserv_process_messages to not reschedule itself and rely on
  zserv_read() to do so in all proper cases

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
zebra/zserv.c
zebra/zserv.h

index 4b12aefd2dc38320c91a17d0a97ba1553754860d..20c0bf547ec3739cf6c737ebbb2cbcf93dc10aa5 100644 (file)
@@ -170,6 +170,7 @@ static void zserv_client_close(struct zserv *client)
 {
        THREAD_OFF(client->t_read);
        THREAD_OFF(client->t_write);
+       THREAD_OFF(client->t_flush);
        zserv_event(client, ZSERV_HANDLE_CLOSE);
 }
 
@@ -177,7 +178,6 @@ static int zserv_flush_data(struct thread *thread)
 {
        struct zserv *client = THREAD_ARG(thread);
 
-       client->t_write = NULL;
        switch (buffer_flush_available(client->wb, client->sock)) {
        case BUFFER_ERROR:
                zlog_warn(
@@ -217,6 +217,7 @@ static int zserv_write(struct thread *thread)
        uint32_t wcmd;
        int writerv = BUFFER_EMPTY;
        struct stream_fifo *cache = stream_fifo_new();
+       bool ok = true;
 
        pthread_mutex_lock(&client->obuf_mtx);
        {
@@ -226,7 +227,7 @@ static int zserv_write(struct thread *thread)
        }
        pthread_mutex_unlock(&client->obuf_mtx);
 
-       while (cache->head) {
+       while (stream_fifo_head(cache) && ok) {
                msg = stream_fifo_pop(cache);
                stream_set_getp(msg, 0);
 
@@ -234,26 +235,29 @@ static int zserv_write(struct thread *thread)
                writerv = buffer_write(client->wb, client->sock,
                                       STREAM_DATA(msg), stream_get_endp(msg));
 
+               switch (writerv) {
+               case BUFFER_ERROR:
+                       zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
+                                 __func__, zebra_route_string(client->proto),
+                                 client->sock);
+                       zlog_warn("%s: closing connection to %s", __func__,
+                                 zebra_route_string(client->proto));
+                       zserv_client_close(client);
+                       ok = false;
+                       break;
+               /* continue writing */
+               case BUFFER_PENDING:
+               case BUFFER_EMPTY:
+                       break;
+               }
+
                stream_free(msg);
        }
 
-       stream_fifo_free(cache);
-
-       switch (writerv) {
-       case BUFFER_ERROR:
-               zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
-                         __func__, zebra_route_string(client->proto),
-                         client->sock);
-               zlog_warn("%s: closing connection to %s", __func__,
-                         zebra_route_string(client->proto));
-               zserv_client_close(client);
-               break;
-       case BUFFER_PENDING:
+       if (ok && writerv == BUFFER_PENDING)
                zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
-               break;
-       case BUFFER_EMPTY:
-               break;
-       }
+
+       stream_fifo_free(cache);
 
        atomic_store_explicit(&client->last_write_cmd, wcmd,
                              memory_order_relaxed);
@@ -411,14 +415,15 @@ static int zserv_read(struct thread *thread)
                                                 stream_fifo_pop(cache));
                }
                pthread_mutex_unlock(&client->ibuf_mtx);
+
+               /* Schedule job to process those packets */
+               zserv_event(client, ZSERV_PROCESS_MESSAGES);
+
        }
 
        if (IS_ZEBRA_DEBUG_PACKET)
                zlog_debug("Read %d packets", p2p_orig - p2p);
 
-       /* Schedule job to process those packets */
-       zserv_event(client, ZSERV_PROCESS_MESSAGES);
-
        /* Reschedule ourselves */
        zserv_client_event(client, ZSERV_CLIENT_READ);
 
@@ -446,7 +451,7 @@ static void zserv_client_event(struct zserv *client,
                break;
        case ZSERV_CLIENT_FLUSH_DATA:
                thread_add_write(client->pthread->master, zserv_flush_data,
-                                client, client->sock, &client->t_write);
+                                client, client->sock, &client->t_flush);
                break;
        }
 }
@@ -464,8 +469,11 @@ static void zserv_client_event(struct zserv *client,
  * with the message is executed. This proceeds until there are no more messages,
  * an error occurs, or the processing limit is reached.
  *
- * This task reschedules itself if it cannot process everything on the input
- * queue in one run.
+ * The client's I/O thread can push at most zebrad.packets_to_process messages
+ * onto the input buffer before notifying us there are packets to read. As long
+ * as we always process zebrad.packets_to_process messages here, then we can
+ * rely on the read thread to handle queuing this task enough times to process
+ * everything on the input queue.
  */
 static int zserv_process_messages(struct thread *thread)
 {
@@ -477,19 +485,19 @@ static int zserv_process_messages(struct thread *thread)
 
        pthread_mutex_lock(&client->ibuf_mtx);
        {
-               for (uint32_t i = p2p - 1; i && client->ibuf_fifo->head; --i)
-                       stream_fifo_push(cache,
-                                        stream_fifo_pop(client->ibuf_fifo));
+               uint32_t i;
+               for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo);
+                    ++i) {
+                       msg = stream_fifo_pop(client->ibuf_fifo);
+                       stream_fifo_push(cache, msg);
+               }
 
-               if (client->ibuf_fifo->head)
-                       zserv_event(client, ZSERV_PROCESS_MESSAGES);
+               msg = NULL;
        }
        pthread_mutex_unlock(&client->ibuf_mtx);
 
-       while (p2p--) {
+       while (stream_fifo_head(cache)) {
                msg = stream_fifo_pop(cache);
-               if (!msg)
-                       break;
                zserv_handle_commands(client, msg);
                stream_free(msg);
        }
@@ -614,6 +622,7 @@ static int zserv_handle_client_close(struct thread *thread)
         */
        assert(!client->t_read);
        assert(!client->t_write);
+       assert(!client->t_flush);
 
        /* synchronously stop thread */
        frr_pthread_stop(client->pthread, NULL);
index a1b55bf8ebe0cce6d3f53b8ac167af6d57f0bc39..fc338d89e73b3e8005271ad978973271bd4279ca 100644 (file)
@@ -72,6 +72,7 @@ struct zserv {
        /* Threads for read/write. */
        struct thread *t_read;
        struct thread *t_write;
+       struct thread *t_flush;
 
        /* default routing table this client munges */
        int rtm_table;