From 329e35dab8e11298b3d93ae53335063d21492972 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 14 Mar 2018 00:49:34 -0400 Subject: [PATCH] zebra: multithreaded zserv Handle each zclient in its own thread. Signed-off-by: Quentin Young --- zebra/main.c | 3 + zebra/zserv.c | 229 ++++++++++++++++++++++++++++++++++---------------- zebra/zserv.h | 5 ++ 3 files changed, 165 insertions(+), 72 deletions(-) diff --git a/zebra/main.c b/zebra/main.c index 9c721f0a7e..9bcbaa3c20 100644 --- a/zebra/main.c +++ b/zebra/main.c @@ -37,6 +37,7 @@ #include "logicalrouter.h" #include "libfrr.h" #include "routemap.h" +#include "frr_pthread.h" #include "zebra/rib.h" #include "zebra/zserv.h" @@ -378,6 +379,8 @@ int main(int argc, char **argv) /* Needed for BSD routing socket. */ pid = getpid(); + frr_pthread_init(); + /* This must be done only after locking pidfile (bug #403). */ zebra_zserv_socket_init(zserv_path); diff --git a/zebra/zserv.c b/zebra/zserv.c index 7dcd654240..625174e241 100644 --- a/zebra/zserv.c +++ b/zebra/zserv.c @@ -72,22 +72,43 @@ static void zebra_event(struct zserv *client, enum event event); int zebra_server_send_message(struct zserv *client, struct stream *msg) { - stream_fifo_push(client->obuf_fifo, msg); - zebra_event(client, ZEBRA_WRITE); + pthread_mutex_lock(&client->obuf_mtx); + { + stream_fifo_push(client->obuf_fifo, msg); + zebra_event(client, ZEBRA_WRITE); + } + pthread_mutex_unlock(&client->obuf_mtx); return 0; } -/* Lifecycle ---------------------------------------------------------------- */ - /* Hooks for client connect / disconnect */ DEFINE_HOOK(zapi_client_connect, (struct zserv *client), (client)); DEFINE_KOOH(zapi_client_close, (struct zserv *client), (client)); -/* free zebra client information. */ +/* + * Deinitialize zebra client. + * + * - Deregister and deinitialize related internal resources + * - Gracefully close socket + * - Free associated resources + * - Free client structure + * + * This does *not* take any action on the struct thread * fields. These are + * managed by the owning pthread and any tasks associated with them must have + * been stopped prior to invoking this function. + */ static void zebra_client_free(struct zserv *client) { hook_call(zapi_client_close, client); + /* + * Ensure these have been nulled. This does not equate to the + * associated task(s) being scheduled or unscheduled on the client + * pthread's threadmaster. + */ + assert(!client->t_read); + assert(!client->t_write); + /* Close file descriptor. */ if (client->sock) { unsigned long nroutes; @@ -113,13 +134,9 @@ static void zebra_client_free(struct zserv *client) if (client->wb) buffer_free(client->wb); - /* Release threads. */ - if (client->t_read) - thread_cancel(client->t_read); - if (client->t_write) - thread_cancel(client->t_write); - if (client->t_suicide) - thread_cancel(client->t_suicide); + /* Free buffer mutexes */ + pthread_mutex_destroy(&client->obuf_mtx); + pthread_mutex_destroy(&client->ibuf_mtx); /* Free bitmaps. */ for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++) @@ -134,12 +151,37 @@ static void zebra_client_free(struct zserv *client) } /* - * Called from client thread to terminate itself. + * Finish closing a client. + * + * This task is scheduled by a ZAPI client pthread on the main pthread when it + * wants to stop itself. When this executes, the client connection should + * already have been closed. This task's responsibility is to gracefully + * terminate the client thread, update relevant internal datastructures and + * free any resources allocated by the main thread. */ -static void zebra_client_close(struct zserv *client) +static int zebra_client_handle_close(struct thread *thread) { + struct zserv *client = THREAD_ARG(thread); + frr_pthread_stop(client->pthread, NULL); listnode_delete(zebrad.client_list, client); zebra_client_free(client); + return 0; +} + +/* + * Gracefully shut down a client connection. + * + * Cancel any pending tasks for the client's thread. Then schedule a task on the + * main thread to shut down the calling thread. + * + * Must be called from the client pthread, never the main thread. + */ +static void zebra_client_close(struct zserv *client) +{ + THREAD_OFF(client->t_read); + THREAD_OFF(client->t_write); + thread_add_event(zebrad.master, zebra_client_handle_close, client, 0, + NULL); } /* Make new client. */ @@ -157,6 +199,8 @@ static void zebra_client_create(int sock) client->obuf_fifo = stream_fifo_new(); client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ); client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ); + pthread_mutex_init(&client->ibuf_mtx, NULL); + pthread_mutex_init(&client->obuf_mtx, NULL); client->wb = buffer_new(0); /* Set table number. */ @@ -177,21 +221,23 @@ static void zebra_client_create(int sock) /* Add this client to linked list. */ listnode_add(zebrad.client_list, client); - zebra_vrf_update_all(client); + struct frr_pthread_attr zclient_pthr_attrs = { + .id = frr_pthread_get_id(), + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop + }; + client->pthread = frr_pthread_new(&zclient_pthr_attrs, "Zebra API client thread"); - hook_call(zapi_client_connect, client); + zebra_vrf_update_all(client); /* start read loop */ zebra_event(client, ZEBRA_READ); -} -static int zserv_delayed_close(struct thread *thread) -{ - struct zserv *client = THREAD_ARG(thread); + /* call callbacks */ + hook_call(zapi_client_connect, client); - client->t_suicide = NULL; - zebra_client_close(client); - return 0; + /* start pthread */ + frr_pthread_run(client->pthread, NULL); } /* @@ -225,10 +271,6 @@ static int zserv_flush_data(struct thread *thread) struct zserv *client = THREAD_ARG(thread); client->t_write = NULL; - if (client->t_suicide) { - zebra_client_close(client); - return -1; - } switch (buffer_flush_available(client->wb, client->sock)) { case BUFFER_ERROR: zlog_warn( @@ -239,7 +281,7 @@ static int zserv_flush_data(struct thread *thread) break; case BUFFER_PENDING: client->t_write = NULL; - thread_add_write(zebrad.master, zserv_flush_data, client, + thread_add_write(client->pthread->master, zserv_flush_data, client, client->sock, &client->t_write); break; case BUFFER_EMPTY: @@ -260,13 +302,15 @@ static int zserv_write(struct thread *thread) struct stream *msg; int writerv; - if (client->t_suicide) - return -1; - if (client->is_synchronous) return 0; - msg = stream_fifo_pop(client->obuf_fifo); + pthread_mutex_lock(&client->obuf_mtx); + { + msg = stream_fifo_pop(client->obuf_fifo); + } + pthread_mutex_unlock(&client->obuf_mtx); + stream_set_getp(msg, 0); client->last_write_cmd = stream_getw_from(msg, 6); @@ -277,30 +321,27 @@ static int zserv_write(struct thread *thread) switch (writerv) { case BUFFER_ERROR: - zlog_warn( - "%s: buffer_write failed to zserv client fd %d, closing", - __func__, client->sock); - /* - * Schedule a delayed close since many of the functions that - * call this one do not check the return code. They do not - * allow for the possibility that an I/O error may have caused - * the client to be deleted. - */ - client->t_suicide = NULL; - thread_add_event(zebrad.master, zserv_delayed_close, client, 0, - &client->t_suicide); + zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]", + __func__, zebra_route_string(client->proto), + client->sock); + zlog_warn("%s: closing connection to %s", __func__, + zebra_route_string(client->proto)); + zebra_client_close(client); return -1; - case BUFFER_EMPTY: - THREAD_OFF(client->t_write); - break; case BUFFER_PENDING: - thread_add_write(zebrad.master, zserv_flush_data, client, - client->sock, &client->t_write); + thread_add_write(client->pthread->master, zserv_flush_data, + client, client->sock, &client->t_write); + break; + case BUFFER_EMPTY: break; } - if (client->obuf_fifo->count) - zebra_event(client, ZEBRA_WRITE); + pthread_mutex_lock(&client->obuf_mtx); + { + if (client->obuf_fifo->count) + zebra_event(client, ZEBRA_WRITE); + } + pthread_mutex_unlock(&client->obuf_mtx); client->last_write_time = monotime(NULL); return 0; @@ -326,6 +367,18 @@ static void zserv_write_incoming(struct stream *orig, uint16_t command) } #endif +/* + * Read and process messages from a client. + * + * This task runs on the main pthread. It is scheduled by client pthreads when + * they have new messages available on their input queues. The client is passed + * as the task argument. + * + * Each message is popped off the client's input queue and the action associated + * with the message is executed. This proceeds until there are no more messages, + * an error occurs, or the processing limit is reached. In the last case, this + * task reschedules itself. + */ static int zserv_process_messages(struct thread *thread) { struct zserv *client = THREAD_ARG(thread); @@ -334,8 +387,14 @@ static int zserv_process_messages(struct thread *thread) struct stream *msg; bool hdrvalid; + int p2p = zebrad.packets_to_process; + do { - msg = stream_fifo_pop(client->ibuf_fifo); + pthread_mutex_lock(&client->ibuf_mtx); + { + msg = stream_fifo_pop(client->ibuf_fifo); + } + pthread_mutex_unlock(&client->ibuf_mtx); /* break if out of messages */ if (!msg) @@ -363,32 +422,56 @@ static int zserv_process_messages(struct thread *thread) /* process commands */ zserv_handle_commands(client, &hdr, msg, zvrf); - } while (msg); + } while (msg && --p2p); + + /* reschedule self if necessary */ + pthread_mutex_lock(&client->ibuf_mtx); + { + if (client->ibuf_fifo->count) + thread_add_event(zebrad.master, &zserv_process_messages, + client, 0, NULL); + } + pthread_mutex_unlock(&client->ibuf_mtx); return 0; } -/* Handler of zebra service request. */ +/* + * Read and process data from a client socket. + * + * The responsibilities here are to read raw data from the client socket, + * validate the header, encapsulate it into a single stream object, push it + * onto the input queue and then notify the main thread that there is new data + * available. + * + * This function first looks for any data in the client structure's working + * input buffer. If data is present, it is assumed that reading stopped in a + * previous invocation of this task and needs to be resumed to finish a message. + * Otherwise, the socket data stream is assumed to be at the beginning of a new + * ZAPI message (specifically at the header). The header is read and validated. + * If the header passed validation then the length field found in the header is + * used to compute the total length of the message. That much data is read (but + * not inspected), appended to the header, placed into a stream and pushed onto + * the client's input queue. A task is then scheduled on the main thread to + * process the client's input queue. Finally, if all of this was successful, + * this task reschedules itself. + * + * Any failure in any of these actions is handled by terminating the client. + */ static int zserv_read(struct thread *thread) { int sock; struct zserv *client; size_t already; #if defined(HANDLE_ZAPI_FUZZING) - int packets = 1; + int p2p = 1; #else - int packets = zebrad.packets_to_process; + int p2p = zebrad.packets_to_process; #endif - /* Get thread data. Reset reading thread because I'm running. */ sock = THREAD_FD(thread); client = THREAD_ARG(thread); - if (client->t_suicide) { - zebra_client_close(client); - return -1; - } - - while (packets) { + while (p2p--) { struct zmsghdr hdr; ssize_t nb; bool hdrvalid; @@ -486,18 +569,18 @@ static int zserv_read(struct thread *thread) stream_set_getp(client->ibuf_work, 0); struct stream *msg = stream_dup(client->ibuf_work); - stream_fifo_push(client->ibuf_fifo, msg); - - if (client->t_suicide) - goto zread_fail; + pthread_mutex_lock(&client->ibuf_mtx); + { + stream_fifo_push(client->ibuf_fifo, msg); + } + pthread_mutex_unlock(&client->ibuf_mtx); - --packets; stream_reset(client->ibuf_work); } if (IS_ZEBRA_DEBUG_PACKET) zlog_debug("Read %d packets", - zebrad.packets_to_process - packets); + zebrad.packets_to_process - p2p); /* Schedule job to process those packets */ thread_add_event(zebrad.master, &zserv_process_messages, client, 0, @@ -517,16 +600,18 @@ static void zebra_event(struct zserv *client, enum event event) { switch (event) { case ZEBRA_READ: - thread_add_read(zebrad.master, zserv_read, client, client->sock, - &client->t_read); + thread_add_read(client->pthread->master, zserv_read, client, + client->sock, &client->t_read); break; case ZEBRA_WRITE: - thread_add_write(zebrad.master, zserv_write, client, + thread_add_write(client->pthread->master, zserv_write, client, client->sock, &client->t_write); break; } } +/* Main thread lifecycle ----------------------------------------------------*/ + /* Accept code of zebra server socket. */ static int zebra_accept(struct thread *thread) { diff --git a/zebra/zserv.h b/zebra/zserv.h index a5b5acbb33..f466545fc8 100644 --- a/zebra/zserv.h +++ b/zebra/zserv.h @@ -50,11 +50,16 @@ /* Client structure. */ struct zserv { + /* Client pthread */ + struct frr_pthread *pthread; + /* Client file descriptor. */ int sock; /* Input/output buffer to the client. */ + pthread_mutex_t ibuf_mtx; struct stream_fifo *ibuf_fifo; + pthread_mutex_t obuf_mtx; struct stream_fifo *obuf_fifo; /* Private I/O buffers */ -- 2.39.5