int zebra_server_send_message(struct zserv *client, struct stream *msg)
{
- stream_fifo_push(client->obuf_fifo, msg);
- zebra_event(client, ZEBRA_WRITE);
+ pthread_mutex_lock(&client->obuf_mtx);
+ {
+ stream_fifo_push(client->obuf_fifo, msg);
+ zebra_event(client, ZEBRA_WRITE);
+ }
+ pthread_mutex_unlock(&client->obuf_mtx);
return 0;
}
-/* Lifecycle ---------------------------------------------------------------- */
-
/* Hooks for client connect / disconnect */
DEFINE_HOOK(zapi_client_connect, (struct zserv *client), (client));
DEFINE_KOOH(zapi_client_close, (struct zserv *client), (client));
-/* free zebra client information. */
+/*
+ * Deinitialize zebra client.
+ *
+ * - Deregister and deinitialize related internal resources
+ * - Gracefully close socket
+ * - Free associated resources
+ * - Free client structure
+ *
+ * This does *not* take any action on the struct thread * fields. These are
+ * managed by the owning pthread and any tasks associated with them must have
+ * been stopped prior to invoking this function.
+ */
static void zebra_client_free(struct zserv *client)
{
hook_call(zapi_client_close, client);
+ /*
+ * Ensure these have been nulled. This does not equate to the
+ * associated task(s) being scheduled or unscheduled on the client
+ * pthread's threadmaster.
+ */
+ assert(!client->t_read);
+ assert(!client->t_write);
+
/* Close file descriptor. */
if (client->sock) {
unsigned long nroutes;
if (client->wb)
buffer_free(client->wb);
- /* Release threads. */
- if (client->t_read)
- thread_cancel(client->t_read);
- if (client->t_write)
- thread_cancel(client->t_write);
- if (client->t_suicide)
- thread_cancel(client->t_suicide);
+ /* Free buffer mutexes */
+ pthread_mutex_destroy(&client->obuf_mtx);
+ pthread_mutex_destroy(&client->ibuf_mtx);
/* Free bitmaps. */
for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
}
/*
- * Called from client thread to terminate itself.
+ * Finish closing a client.
+ *
+ * This task is scheduled by a ZAPI client pthread on the main pthread when it
+ * wants to stop itself. When this executes, the client connection should
+ * already have been closed. This task's responsibility is to gracefully
+ * terminate the client thread, update relevant internal datastructures and
+ * free any resources allocated by the main thread.
*/
-static void zebra_client_close(struct zserv *client)
+static int zebra_client_handle_close(struct thread *thread)
{
+ struct zserv *client = THREAD_ARG(thread);
+ frr_pthread_stop(client->pthread, NULL);
listnode_delete(zebrad.client_list, client);
zebra_client_free(client);
+ return 0;
+}
+
+/*
+ * Gracefully shut down a client connection.
+ *
+ * Cancel any pending tasks for the client's thread. Then schedule a task on the
+ * main thread to shut down the calling thread.
+ *
+ * Must be called from the client pthread, never the main thread.
+ */
+static void zebra_client_close(struct zserv *client)
+{
+ THREAD_OFF(client->t_read);
+ THREAD_OFF(client->t_write);
+ thread_add_event(zebrad.master, zebra_client_handle_close, client, 0,
+ NULL);
}
/* Make new client. */
client->obuf_fifo = stream_fifo_new();
client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+ pthread_mutex_init(&client->ibuf_mtx, NULL);
+ pthread_mutex_init(&client->obuf_mtx, NULL);
client->wb = buffer_new(0);
/* Set table number. */
/* Add this client to linked list. */
listnode_add(zebrad.client_list, client);
- zebra_vrf_update_all(client);
+ struct frr_pthread_attr zclient_pthr_attrs = {
+ .id = frr_pthread_get_id(),
+ .start = frr_pthread_attr_default.start,
+ .stop = frr_pthread_attr_default.stop
+ };
+ client->pthread = frr_pthread_new(&zclient_pthr_attrs, "Zebra API client thread");
- hook_call(zapi_client_connect, client);
+ zebra_vrf_update_all(client);
/* start read loop */
zebra_event(client, ZEBRA_READ);
-}
-static int zserv_delayed_close(struct thread *thread)
-{
- struct zserv *client = THREAD_ARG(thread);
+ /* call callbacks */
+ hook_call(zapi_client_connect, client);
- client->t_suicide = NULL;
- zebra_client_close(client);
- return 0;
+ /* start pthread */
+ frr_pthread_run(client->pthread, NULL);
}
/*
struct zserv *client = THREAD_ARG(thread);
client->t_write = NULL;
- if (client->t_suicide) {
- zebra_client_close(client);
- return -1;
- }
switch (buffer_flush_available(client->wb, client->sock)) {
case BUFFER_ERROR:
zlog_warn(
break;
case BUFFER_PENDING:
client->t_write = NULL;
- thread_add_write(zebrad.master, zserv_flush_data, client,
+ thread_add_write(client->pthread->master, zserv_flush_data, client,
client->sock, &client->t_write);
break;
case BUFFER_EMPTY:
struct stream *msg;
int writerv;
- if (client->t_suicide)
- return -1;
-
if (client->is_synchronous)
return 0;
- msg = stream_fifo_pop(client->obuf_fifo);
+ pthread_mutex_lock(&client->obuf_mtx);
+ {
+ msg = stream_fifo_pop(client->obuf_fifo);
+ }
+ pthread_mutex_unlock(&client->obuf_mtx);
+
stream_set_getp(msg, 0);
client->last_write_cmd = stream_getw_from(msg, 6);
switch (writerv) {
case BUFFER_ERROR:
- zlog_warn(
- "%s: buffer_write failed to zserv client fd %d, closing",
- __func__, client->sock);
- /*
- * Schedule a delayed close since many of the functions that
- * call this one do not check the return code. They do not
- * allow for the possibility that an I/O error may have caused
- * the client to be deleted.
- */
- client->t_suicide = NULL;
- thread_add_event(zebrad.master, zserv_delayed_close, client, 0,
- &client->t_suicide);
+ zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
+ __func__, zebra_route_string(client->proto),
+ client->sock);
+ zlog_warn("%s: closing connection to %s", __func__,
+ zebra_route_string(client->proto));
+ zebra_client_close(client);
return -1;
- case BUFFER_EMPTY:
- THREAD_OFF(client->t_write);
- break;
case BUFFER_PENDING:
- thread_add_write(zebrad.master, zserv_flush_data, client,
- client->sock, &client->t_write);
+ thread_add_write(client->pthread->master, zserv_flush_data,
+ client, client->sock, &client->t_write);
+ break;
+ case BUFFER_EMPTY:
break;
}
- if (client->obuf_fifo->count)
- zebra_event(client, ZEBRA_WRITE);
+ pthread_mutex_lock(&client->obuf_mtx);
+ {
+ if (client->obuf_fifo->count)
+ zebra_event(client, ZEBRA_WRITE);
+ }
+ pthread_mutex_unlock(&client->obuf_mtx);
client->last_write_time = monotime(NULL);
return 0;
}
#endif
+/*
+ * Read and process messages from a client.
+ *
+ * This task runs on the main pthread. It is scheduled by client pthreads when
+ * they have new messages available on their input queues. The client is passed
+ * as the task argument.
+ *
+ * Each message is popped off the client's input queue and the action associated
+ * with the message is executed. This proceeds until there are no more messages,
+ * an error occurs, or the processing limit is reached. In the last case, this
+ * task reschedules itself.
+ */
static int zserv_process_messages(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
struct stream *msg;
bool hdrvalid;
+ int p2p = zebrad.packets_to_process;
+
do {
- msg = stream_fifo_pop(client->ibuf_fifo);
+ pthread_mutex_lock(&client->ibuf_mtx);
+ {
+ msg = stream_fifo_pop(client->ibuf_fifo);
+ }
+ pthread_mutex_unlock(&client->ibuf_mtx);
/* break if out of messages */
if (!msg)
/* process commands */
zserv_handle_commands(client, &hdr, msg, zvrf);
- } while (msg);
+ } while (msg && --p2p);
+
+ /* reschedule self if necessary */
+ pthread_mutex_lock(&client->ibuf_mtx);
+ {
+ if (client->ibuf_fifo->count)
+ thread_add_event(zebrad.master, &zserv_process_messages,
+ client, 0, NULL);
+ }
+ pthread_mutex_unlock(&client->ibuf_mtx);
return 0;
}
-/* Handler of zebra service request. */
+/*
+ * Read and process data from a client socket.
+ *
+ * The responsibilities here are to read raw data from the client socket,
+ * validate the header, encapsulate it into a single stream object, push it
+ * onto the input queue and then notify the main thread that there is new data
+ * available.
+ *
+ * This function first looks for any data in the client structure's working
+ * input buffer. If data is present, it is assumed that reading stopped in a
+ * previous invocation of this task and needs to be resumed to finish a message.
+ * Otherwise, the socket data stream is assumed to be at the beginning of a new
+ * ZAPI message (specifically at the header). The header is read and validated.
+ * If the header passed validation then the length field found in the header is
+ * used to compute the total length of the message. That much data is read (but
+ * not inspected), appended to the header, placed into a stream and pushed onto
+ * the client's input queue. A task is then scheduled on the main thread to
+ * process the client's input queue. Finally, if all of this was successful,
+ * this task reschedules itself.
+ *
+ * Any failure in any of these actions is handled by terminating the client.
+ */
static int zserv_read(struct thread *thread)
{
int sock;
struct zserv *client;
size_t already;
#if defined(HANDLE_ZAPI_FUZZING)
- int packets = 1;
+ int p2p = 1;
#else
- int packets = zebrad.packets_to_process;
+ int p2p = zebrad.packets_to_process;
#endif
- /* Get thread data. Reset reading thread because I'm running. */
sock = THREAD_FD(thread);
client = THREAD_ARG(thread);
- if (client->t_suicide) {
- zebra_client_close(client);
- return -1;
- }
-
- while (packets) {
+ while (p2p--) {
struct zmsghdr hdr;
ssize_t nb;
bool hdrvalid;
stream_set_getp(client->ibuf_work, 0);
struct stream *msg = stream_dup(client->ibuf_work);
- stream_fifo_push(client->ibuf_fifo, msg);
-
- if (client->t_suicide)
- goto zread_fail;
+ pthread_mutex_lock(&client->ibuf_mtx);
+ {
+ stream_fifo_push(client->ibuf_fifo, msg);
+ }
+ pthread_mutex_unlock(&client->ibuf_mtx);
- --packets;
stream_reset(client->ibuf_work);
}
if (IS_ZEBRA_DEBUG_PACKET)
zlog_debug("Read %d packets",
- zebrad.packets_to_process - packets);
+ zebrad.packets_to_process - p2p);
/* Schedule job to process those packets */
thread_add_event(zebrad.master, &zserv_process_messages, client, 0,
{
switch (event) {
case ZEBRA_READ:
- thread_add_read(zebrad.master, zserv_read, client, client->sock,
- &client->t_read);
+ thread_add_read(client->pthread->master, zserv_read, client,
+ client->sock, &client->t_read);
break;
case ZEBRA_WRITE:
- thread_add_write(zebrad.master, zserv_write, client,
+ thread_add_write(client->pthread->master, zserv_write, client,
client->sock, &client->t_write);
break;
}
}
+/* Main thread lifecycle ----------------------------------------------------*/
+
/* Accept code of zebra server socket. */
static int zebra_accept(struct thread *thread)
{