From 1572d9aff0bc024b2c9141166886a13487b5e183 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Tue, 24 Apr 2018 11:36:25 -0400 Subject: [PATCH] zserv: optimize zserv_read * Increase the maximum number of packets to read per read job * Store read packets in a local cached buffer to avoid mutex overhead * Only update last-read time / last-command if we actually read a packet * Add missing log line for corrupt header case Signed-off-by: Quentin Young --- zebra/zserv.c | 35 +++++++++++++++++++++++++---------- zebra/zserv.h | 4 ++-- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/zebra/zserv.c b/zebra/zserv.c index 9e2c443765..b666b2e0ab 100644 --- a/zebra/zserv.c +++ b/zebra/zserv.c @@ -304,16 +304,21 @@ static int zserv_read(struct thread *thread) int sock; struct zserv *client; size_t already; + struct stream_fifo *cache = stream_fifo_new(); + int p2p_orig = atomic_load_explicit(&zebrad.packets_to_process, + memory_order_relaxed); + uint32_t p2p; + struct zmsghdr hdr; + #if defined(HANDLE_ZAPI_FUZZING) int p2p = 1; #else - int p2p = zebrad.packets_to_process; + int p2p = p2p_orig; #endif sock = THREAD_FD(thread); client = THREAD_ARG(thread); while (p2p--) { - struct zmsghdr hdr; ssize_t nb; bool hdrvalid; char errmsg[256]; @@ -373,6 +378,7 @@ static int zserv_read(struct thread *thread) "Message has corrupt header\n%s: socket %d message length %u exceeds buffer size %lu", __func__, sock, hdr.length, (unsigned long)STREAM_SIZE(client->ibuf_work)); + zserv_log_message(errmsg, client->ibuf_work, &hdr); goto zread_fail; } @@ -404,26 +410,32 @@ static int zserv_read(struct thread *thread) if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) zserv_log_message(NULL, client->ibuf_work, &hdr); + stream_set_getp(client->ibuf_work, 0); + struct stream *msg = stream_dup(client->ibuf_work); + + stream_fifo_push(cache, msg); + stream_reset(client->ibuf_work); + } + + if (p2p < p2p_orig) { + /* update session statistics */ atomic_store_explicit(&client->last_read_time, monotime(NULL), memory_order_relaxed); atomic_store_explicit(&client->last_read_cmd, hdr.command, memory_order_relaxed); - stream_set_getp(client->ibuf_work, 0); - struct stream *msg = stream_dup(client->ibuf_work); - + /* publish read packets on client's input queue */ pthread_mutex_lock(&client->ibuf_mtx); { - stream_fifo_push(client->ibuf_fifo, msg); + while (cache->head) + stream_fifo_push(client->ibuf_fifo, + stream_fifo_pop(cache)); } pthread_mutex_unlock(&client->ibuf_mtx); - - stream_reset(client->ibuf_work); } if (IS_ZEBRA_DEBUG_PACKET) - zlog_debug("Read %d packets", - zebrad.packets_to_process - p2p); + zlog_debug("Read %d packets", p2p_orig - p2p); /* Schedule job to process those packets */ zserv_event(client, ZSERV_PROCESS_MESSAGES); @@ -431,9 +443,12 @@ static int zserv_read(struct thread *thread) /* Reschedule ourselves */ zserv_client_event(client, ZSERV_CLIENT_READ); + stream_fifo_free(cache); + return 0; zread_fail: + stream_fifo_free(cache); zserv_client_close(client); return -1; } diff --git a/zebra/zserv.h b/zebra/zserv.h index 6cd8b8c292..a1b55bf8eb 100644 --- a/zebra/zserv.h +++ b/zebra/zserv.h @@ -183,8 +183,8 @@ struct zebra_t { /* LSP work queue */ struct work_queue *lsp_process_q; -#define ZEBRA_ZAPI_PACKETS_TO_PROCESS 10 - uint32_t packets_to_process; +#define ZEBRA_ZAPI_PACKETS_TO_PROCESS 100000 + _Atomic uint32_t packets_to_process; }; extern struct zebra_t zebrad; extern unsigned int multipath_num; -- 2.39.5