From f82370b47bddb214d53ffb94775805d637300e9b Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Wed, 8 Mar 2023 17:11:43 -0500 Subject: [PATCH] mgmtd: lib: utilize msglib constructed from the removed code Signed-off-by: Christian Hopps --- lib/mgmt_be_client.c | 436 ++++++++-------------------------------- lib/mgmt_be_client.h | 14 +- lib/mgmt_fe_client.c | 413 +++++++------------------------------ lib/mgmt_fe_client.h | 15 +- mgmtd/mgmt_be_adapter.c | 313 ++++++---------------------- mgmtd/mgmt_be_adapter.h | 18 +- mgmtd/mgmt_be_server.c | 2 +- mgmtd/mgmt_fe_adapter.c | 328 ++++++------------------------ mgmtd/mgmt_fe_adapter.h | 15 +- mgmtd/mgmt_fe_server.c | 2 +- mgmtd/mgmt_txn.c | 52 +---- 11 files changed, 303 insertions(+), 1305 deletions(-) diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c index 1e98c6123d..4dc7b296b8 100644 --- a/lib/mgmt_be_client.c +++ b/lib/mgmt_be_client.c @@ -9,6 +9,7 @@ #include "libfrr.h" #include "mgmtd/mgmt.h" #include "mgmt_be_client.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "network.h" #include "stream.h" @@ -111,14 +112,8 @@ struct mgmt_be_client_ctx { struct thread *conn_writes_on; struct thread *msg_proc_ev; uint32_t flags; - uint32_t num_msg_tx; - uint32_t num_msg_rx; - struct stream_fifo *ibuf_fifo; - struct stream *ibuf_work; - struct stream_fifo *obuf_fifo; - struct stream *obuf_work; - uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + struct mgmt_msg_state mstate; struct nb_config *candidate_config; struct nb_config *running_config; @@ -143,7 +138,9 @@ struct mgmt_be_client_ctx { static bool mgmt_debug_be_client; -static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0}; +static struct mgmt_be_client_ctx mgmt_be_client_ctx = { + .conn_fd = -1, +}; const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { #ifdef HAVE_STATICD @@ -168,14 +165,13 @@ mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx, { /* Notify client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) - (void)(*client_ctx->client_params - .client_connect_notify)( + (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, client_ctx->client_params.user_data, false); - if (client_ctx->conn_fd) { + if (client_ctx->conn_fd != -1) { close(client_ctx->conn_fd); - client_ctx->conn_fd = 0; + client_ctx->conn_fd = -1; } if (reconnect) @@ -881,181 +877,47 @@ mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx, return 0; } -static int -mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx, - uint8_t *msg_buf, int bytes_read) +static void mgmt_be_client_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_be_client_ctx *client_ctx = user_ctx; Mgmtd__BeMessage *be_msg; - struct mgmt_be_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - - MGMTD_BE_CLIENT_DBG( - "Got message of %d bytes from MGMTD Backend Server", - bytes_read); - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_be_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { - MGMTD_BE_CLIENT_DBG( - "Marker not found in message from MGMTD '%s'", - client_ctx->client_params.name); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_BE_CLIENT_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'", - bytes_left, msg->hdr.len, - client_ctx->client_params.name); - break; - } - - be_msg = mgmtd__be_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), - msg->payload); - if (!be_msg) { - MGMTD_BE_CLIENT_DBG( - "Failed to decode %d bytes from MGMTD '%s'", - msg->hdr.len, client_ctx->client_params.name); - continue; - } - (void)mgmt_be_client_handle_msg(client_ctx, be_msg); - mgmtd__be_message__free_unpacked(be_msg, NULL); - processed++; - client_ctx->num_msg_rx++; + be_msg = mgmtd__be_message__unpack(NULL, len, data); + if (!be_msg) { + MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server", + len); + return; } - - return processed; + MGMTD_BE_CLIENT_DBG( + "Decoded %zu bytes of message(msg: %u/%u) from server", len, + be_msg->message_case, be_msg->message_case); + (void)mgmt_be_client_handle_msg(client_ctx, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); } static void mgmt_be_client_proc_msgbufs(struct thread *thread) { - struct mgmt_be_client_ctx *client_ctx; - struct stream *work; - int processed = 0; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx); - - if (client_ctx->conn_fd == 0) - return; - - for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); - if (!work) - break; - - processed += mgmt_be_client_process_msg( - client_ctx, STREAM_DATA(work), stream_get_endp(work)); - - if (work != client_ctx->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } + struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread); - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(client_ctx->ibuf_fifo)) - mgmt_be_client_register_event(client_ctx, - MGMTD_BE_PROC_MSG); + if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg, + client_ctx, mgmt_debug_be_client)) + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); } static void mgmt_be_client_read(struct thread *thread) { - struct mgmt_be_client_ctx *client_ctx; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_be_msg_hdr *msg_hdr; - bool incomplete = false; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - total_bytes = 0; - bytes_left = STREAM_SIZE(client_ctx->ibuf_work) - - stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(client_ctx->ibuf_work, - client_ctx->conn_fd, bytes_left); - MGMTD_BE_CLIENT_DBG( - "Got %d bytes of message from MGMTD Backend server", - bytes_read); - /* -2 is normal nothing read, and to retry */ - if (bytes_read == -2) - break; - if (bytes_read <= 0) { - if (bytes_read == 0) { - MGMTD_BE_CLIENT_ERR( - "Got EOF/disconnect while reading from MGMTD Frontend server."); - } else { - MGMTD_BE_CLIENT_ERR( - "Got error (%d) while reading from MGMTD Backend server. Err: '%s'", - bytes_read, safe_strerror(errno)); - } - mgmt_be_server_disconnect(client_ctx, true); - return; - } - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - /* - * Check if we have read complete messages or not. - */ - stream_set_getp(client_ctx->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - msg_hdr = (struct mgmt_be_msg_hdr - *)(STREAM_DATA(client_ctx->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_BE_CLIENT_ERR( - "Received corrupted buffer from MGMTD backend server."); - mgmt_be_server_disconnect(client_ctx, true); - return; - } - if (msg_hdr->len > bytes_left) - break; - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } + struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread); + enum mgmt_msg_rsched rv; - if (!msg_cnt) - goto resched; - - if (bytes_left > 0) - incomplete = true; - - /* - * We have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = - (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) + - total_bytes); - stream_set_endp(client_ctx->ibuf_work, total_bytes); - stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); - client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (incomplete) { - stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(client_ctx->ibuf_work, bytes_left); + rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_be_client); + if (rv == MSR_DISCONNECT) { + mgmt_be_server_disconnect(client_ctx, true); + return; } - mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); - -resched: + if (rv == MSR_SCHED_BOTH) + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); } @@ -1072,9 +934,7 @@ mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx) { MGMTD_BE_CLIENT_DBG("Resume writing msgs"); UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); - if (client_ctx->obuf_work - || stream_fifo_count_safe(client_ctx->obuf_fifo)) - mgmt_be_client_sched_msg_write(client_ctx); + mgmt_be_client_sched_msg_write(client_ctx); } static inline void @@ -1085,99 +945,39 @@ mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx) } static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, - Mgmtd__BeMessage *be_msg) + Mgmtd__BeMessage *be_msg) { - size_t msg_size; - uint8_t *msg_buf = client_ctx->msg_buf; - struct mgmt_be_msg *msg; - - if (client_ctx->conn_fd == 0) - return -1; - - msg_size = mgmtd__be_message__get_packed_size(be_msg); - msg_size += MGMTD_BE_MSG_HDR_LEN; - if (msg_size > MGMTD_BE_MSG_MAX_LEN) { - MGMTD_BE_CLIENT_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + if (client_ctx->conn_fd == -1) { + MGMTD_BE_CLIENT_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_be_msg *)msg_buf; - msg->hdr.marker = MGMTD_BE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__be_message__pack(be_msg, msg->payload); - - if (!client_ctx->obuf_work) - client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - } - stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &client_ctx->mstate, be_msg, + mgmtd__be_message__get_packed_size(be_msg), + (size_t(*)(void *, void *))mgmtd__be_message__pack, + mgmt_debug_be_client); mgmt_be_client_sched_msg_write(client_ctx); - client_ctx->num_msg_tx++; - return 0; + return rv; } static void mgmt_be_client_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; - struct mgmt_be_client_ctx *client_ctx; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - /* Ensure pushing any pending write buffer to FIFO */ - if (client_ctx->obuf_work) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = NULL; - } - - for (s = stream_fifo_head(client_ctx->obuf_fifo); - s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(client_ctx->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, client_ctx->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_be_client_register_event( - client_ctx, MGMTD_BE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_BE_CLIENT_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_be_client_register_event( - client_ctx, MGMTD_BE_CONN_WRITE); - return; - } - mgmt_be_server_disconnect(client_ctx, true); - return; - } - - free = stream_fifo_pop(client_ctx->obuf_fifo); - stream_free(free); - MGMTD_BE_CLIENT_DBG( - "Wrote %d bytes of message to MGMTD Backend client socket.'", - bytes_written); - processed++; - } - - if (s) { + struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread); + enum mgmt_msg_wsched rv; + + rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_be_client); + if (rv == MSW_SCHED_STREAM) + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_be_server_disconnect(client_ctx, true); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_be_client_writes_off(client_ctx); mgmt_be_client_register_event(client_ctx, - MGMTD_BE_CONN_WRITES_ON); - } + MGMTD_BE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_be_client_resume_writes(struct thread *thread) @@ -1185,15 +985,14 @@ static void mgmt_be_client_resume_writes(struct thread *thread) struct mgmt_be_client_ctx *client_ctx; client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); + assert(client_ctx && client_ctx->conn_fd != -1); mgmt_be_client_writes_on(client_ctx); } static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, - bool subscr_xpaths, - uint16_t num_reg_xpaths, - char **reg_xpaths) + bool subscr_xpaths, uint16_t num_reg_xpaths, + char **reg_xpaths) { Mgmtd__BeMessage be_msg; Mgmtd__BeSubscribeReq subscr_req; @@ -1214,86 +1013,35 @@ static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, return mgmt_be_client_send_msg(client_ctx, &be_msg); } -static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) +static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) { - int ret, sock, len; - struct sockaddr_un addr; - - MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s", - MGMTD_BE_SERVER_PATH); - - assert(!client_ctx->conn_fd); + const char *dbgtag = mgmt_debug_be_client ? "BE-client" : NULL; - sock = socket(AF_UNIX, SOCK_STREAM, 0); - if (sock < 0) { - MGMTD_BE_CLIENT_ERR("Failed to create socket"); - goto mgmt_be_server_connect_failed; - } - - MGMTD_BE_CLIENT_DBG( - "Created MGMTD Backend server socket successfully!"); + assert(client_ctx->conn_fd == -1); + client_ctx->conn_fd = mgmt_msg_connect( + MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE, + MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag); - memset(&addr, 0, sizeof(struct sockaddr_un)); - addr.sun_family = AF_UNIX; - strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); -#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN - len = addr.sun_len = SUN_LEN(&addr); -#else - len = sizeof(addr.sun_family) + strlen(addr.sun_path); -#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ - - ret = connect(sock, (struct sockaddr *)&addr, len); - if (ret < 0) { - MGMTD_BE_CLIENT_ERR( - "Failed to connect to MGMTD Backend Server at %s. Err: %s", - addr.sun_path, safe_strerror(errno)); - close(sock); - goto mgmt_be_server_connect_failed; + /* Send SUBSCRIBE_REQ message */ + if (client_ctx->conn_fd == -1 || + mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) { + mgmt_be_server_disconnect(client_ctx, true); + return; } - MGMTD_BE_CLIENT_DBG( - "Connected to MGMTD Backend Server at %s successfully!", - addr.sun_path); - client_ctx->conn_fd = sock; - - /* Make client socket non-blocking. */ - set_nonblocking(sock); - setsockopt_so_sendbuf(client_ctx->conn_fd, - MGMTD_SOCKET_BE_SEND_BUF_SIZE); - setsockopt_so_recvbuf(client_ctx->conn_fd, - MGMTD_SOCKET_BE_RECV_BUF_SIZE); - + /* Start reading from the socket */ mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); /* Notify client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) - (void)(*client_ctx->client_params - .client_connect_notify)( + (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, client_ctx->client_params.user_data, true); - - /* Send SUBSCRIBE_REQ message */ - if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) - goto mgmt_be_server_connect_failed; - - return 0; - -mgmt_be_server_connect_failed: - if (sock && sock != client_ctx->conn_fd) - close(sock); - - mgmt_be_server_disconnect(client_ctx, true); - return -1; } static void mgmt_be_client_conn_timeout(struct thread *thread) { - struct mgmt_be_client_ctx *client_ctx; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx); - - mgmt_be_server_connect(client_ctx); + mgmt_be_server_connect(THREAD_ARG(thread)); } static void @@ -1317,16 +1065,15 @@ mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, break; case MGMTD_BE_PROC_MSG: tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; - thread_add_timer_tv(client_ctx->tm, - mgmt_be_client_proc_msgbufs, client_ctx, - &tv, &client_ctx->msg_proc_ev); + thread_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs, + client_ctx, &tv, &client_ctx->msg_proc_ev); assert(client_ctx->msg_proc_ev); break; case MGMTD_BE_CONN_WRITES_ON: - thread_add_timer_msec( - client_ctx->tm, mgmt_be_client_resume_writes, - client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC, - &client_ctx->conn_writes_on); + thread_add_timer_msec(client_ctx->tm, + mgmt_be_client_resume_writes, client_ctx, + MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &client_ctx->conn_writes_on); assert(client_ctx->conn_writes_on); break; case MGMTD_BE_SERVER: @@ -1376,16 +1123,10 @@ uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, mgmt_be_client_ctx.client_params.conn_retry_intvl_sec = MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC; - assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work && - !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work); - mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head); - mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new(); - mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - mgmt_be_client_ctx.obuf_fifo = stream_fifo_new(); - /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - */ - mgmt_be_client_ctx.obuf_work = NULL; + mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC, + MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, + "BE-client"); /* Start trying to connect to MGMTD backend server immediately */ mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1); @@ -1465,18 +1206,11 @@ void mgmt_be_client_lib_destroy(uintptr_t lib_hndl) assert(client_ctx); MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'", - client_ctx->client_params.name); + client_ctx->client_params.name); mgmt_be_server_disconnect(client_ctx, false); - assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo); - - stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo); - if (mgmt_be_client_ctx.ibuf_work) - stream_free(mgmt_be_client_ctx.ibuf_work); - stream_fifo_free(mgmt_be_client_ctx.obuf_fifo); - if (mgmt_be_client_ctx.obuf_work) - stream_free(mgmt_be_client_ctx.obuf_work); + mgmt_msg_destroy(&client_ctx->mstate); THREAD_OFF(client_ctx->conn_retry_tmr); THREAD_OFF(client_ctx->conn_read_ev); diff --git a/lib/mgmt_be_client.h b/lib/mgmt_be_client.h index bf875f5a45..66bc62fb08 100644 --- a/lib/mgmt_be_client.h +++ b/lib/mgmt_be_client.h @@ -12,6 +12,8 @@ extern "C" { #endif +#include "northbound.h" +#include "mgmt_pb.h" #include "mgmtd/mgmt_defines.h" /*************************************************************** @@ -80,18 +82,6 @@ enum mgmt_be_client_id { #define MGMTD_BE_MAX_CLIENTS_PER_XPATH_REG 32 -struct mgmt_be_msg_hdr { - uint16_t marker; - uint16_t len; /* Includes header */ -}; -#define MGMTD_BE_MSG_HDR_LEN sizeof(struct mgmt_be_msg_hdr) -#define MGMTD_BE_MSG_MARKER 0xfeed - -struct mgmt_be_msg { - struct mgmt_be_msg_hdr hdr; - uint8_t payload[]; -}; - struct mgmt_be_client_txn_ctx { uintptr_t *user_ctx; }; diff --git a/lib/mgmt_fe_client.c b/lib/mgmt_fe_client.c index 22c3a06b9e..73154401ec 100644 --- a/lib/mgmt_fe_client.c +++ b/lib/mgmt_fe_client.c @@ -9,6 +9,7 @@ #include "memory.h" #include "libfrr.h" #include "mgmt_fe_client.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "network.h" #include "stream.h" @@ -55,13 +56,8 @@ struct mgmt_fe_client_ctx { struct thread *conn_writes_on; struct thread *msg_proc_ev; uint32_t flags; - uint32_t num_msg_tx; - uint32_t num_msg_rx; - struct stream_fifo *ibuf_fifo; - struct stream *ibuf_work; - struct stream_fifo *obuf_fifo; - struct stream *obuf_work; + struct mgmt_msg_state mstate; struct mgmt_fe_client_params client_params; @@ -75,7 +71,9 @@ struct mgmt_fe_client_ctx { static bool mgmt_debug_fe_client; -static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {0}; +static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = { + .conn_fd = -1, +}; /* Forward declarations */ static void @@ -124,9 +122,9 @@ static void mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx, bool reconnect) { - if (client_ctx->conn_fd) { + if (client_ctx->conn_fd != -1) { close(client_ctx->conn_fd); - client_ctx->conn_fd = 0; + client_ctx->conn_fd = -1; } if (reconnect) @@ -148,9 +146,7 @@ mgmt_fe_client_writes_on(struct mgmt_fe_client_ctx *client_ctx) { MGMTD_FE_CLIENT_DBG("Resume writing msgs"); UNSET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF); - if (client_ctx->obuf_work - || stream_fifo_count_safe(client_ctx->obuf_fifo)) - mgmt_fe_client_sched_msg_write(client_ctx); + mgmt_fe_client_sched_msg_write(client_ctx); } static inline void @@ -160,101 +156,42 @@ mgmt_fe_client_writes_off(struct mgmt_fe_client_ctx *client_ctx) MGMTD_FE_CLIENT_DBG("Paused writing msgs"); } -static int -mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, - Mgmtd__FeMessage *fe_msg) +static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, + Mgmtd__FeMessage *fe_msg) { - size_t msg_size; - uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN]; - struct mgmt_fe_msg *msg; - - if (client_ctx->conn_fd == 0) - return -1; - - msg_size = mgmtd__fe_message__get_packed_size(fe_msg); - msg_size += MGMTD_FE_MSG_HDR_LEN; - if (msg_size > sizeof(msg_buf)) { - MGMTD_FE_CLIENT_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)sizeof(msg_buf)); + /* users current expect this to fail here */ + if (client_ctx->conn_fd == -1) { + MGMTD_FE_CLIENT_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_fe_msg *)msg_buf; - msg->hdr.marker = MGMTD_FE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__fe_message__pack(fe_msg, msg->payload); - - if (!client_ctx->obuf_work) - client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - } - stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &client_ctx->mstate, fe_msg, + mgmtd__fe_message__get_packed_size(fe_msg), + (size_t(*)(void *, void *))mgmtd__fe_message__pack, + mgmt_debug_fe_client); mgmt_fe_client_sched_msg_write(client_ctx); - client_ctx->num_msg_tx++; - return 0; + return rv; } static void mgmt_fe_client_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; struct mgmt_fe_client_ctx *client_ctx; + enum mgmt_msg_wsched rv; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - /* Ensure pushing any pending write buffer to FIFO */ - if (client_ctx->obuf_work) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = NULL; - } - - for (s = stream_fifo_head(client_ctx->obuf_fifo); - s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(client_ctx->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, client_ctx->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_fe_client_register_event( - client_ctx, MGMTD_FE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_FE_CLIENT_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_fe_client_register_event( - client_ctx, MGMTD_FE_CONN_WRITE); - return; - } - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - - free = stream_fifo_pop(client_ctx->obuf_fifo); - stream_free(free); - MGMTD_FE_CLIENT_DBG( - "Wrote %d bytes of message to MGMTD Backend client socket.'", - bytes_written); - processed++; - } - - if (s) { + rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_fe_client); + if (rv == MSW_SCHED_STREAM) + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_fe_server_disconnect(client_ctx, true); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_fe_client_writes_off(client_ctx); mgmt_fe_client_register_event(client_ctx, - MGMTD_FE_CONN_WRITES_ON); - } + MGMTD_FE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_fe_client_resume_writes(struct thread *thread) @@ -262,7 +199,7 @@ static void mgmt_fe_client_resume_writes(struct thread *thread) struct mgmt_fe_client_ctx *client_ctx; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); + assert(client_ctx && client_ctx->conn_fd != -1); mgmt_fe_client_writes_on(client_ctx); } @@ -713,271 +650,83 @@ mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx, return 0; } -static int -mgmt_fe_client_process_msg(struct mgmt_fe_client_ctx *client_ctx, - uint8_t *msg_buf, int bytes_read) +static void mgmt_fe_client_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_fe_client_ctx *client_ctx = user_ctx; Mgmtd__FeMessage *fe_msg; - struct mgmt_fe_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - MGMTD_FE_CLIENT_DBG( - "Have %u bytes of messages from MGMTD Frontend server to .", - bytes_read); - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_fe_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) { - MGMTD_FE_CLIENT_DBG( - "Marker not found in message from MGMTD Frontend server."); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_FE_CLIENT_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend server.", - bytes_left, msg->hdr.len); - break; - } - - fe_msg = mgmtd__fe_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN), - msg->payload); - if (!fe_msg) { - MGMTD_FE_CLIENT_DBG( - "Failed to decode %d bytes from MGMTD Frontend server.", - msg->hdr.len); - continue; - } - - MGMTD_FE_CLIENT_DBG( - "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend server", - msg->hdr.len, fe_msg->message_case, - fe_msg->message_case); - - (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg); - - mgmtd__fe_message__free_unpacked(fe_msg, NULL); - processed++; - client_ctx->num_msg_rx++; + fe_msg = mgmtd__fe_message__unpack(NULL, len, data); + if (!fe_msg) { + MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.", + len); + return; } - - return processed; + MGMTD_FE_CLIENT_DBG( + "Decoded %zu bytes of message(msg: %u/%u) from server", len, + fe_msg->message_case, fe_msg->message_case); + (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg); + mgmtd__fe_message__free_unpacked(fe_msg, NULL); } static void mgmt_fe_client_proc_msgbufs(struct thread *thread) { struct mgmt_fe_client_ctx *client_ctx; - struct stream *work; - int processed = 0; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); - if (!work) - break; - - processed += mgmt_fe_client_process_msg( - client_ctx, STREAM_DATA(work), stream_get_endp(work)); - - if (work != client_ctx->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } - - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(client_ctx->ibuf_fifo)) - mgmt_fe_client_register_event(client_ctx, - MGMTD_FE_PROC_MSG); + if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg, + client_ctx, mgmt_debug_fe_client)) + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); } static void mgmt_fe_client_read(struct thread *thread) { struct mgmt_fe_client_ctx *client_ctx; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_fe_msg_hdr *msg_hdr; - bool incomplete = false; + enum mgmt_msg_rsched rv; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - total_bytes = 0; - bytes_left = STREAM_SIZE(client_ctx->ibuf_work) - - stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(client_ctx->ibuf_work, - client_ctx->conn_fd, bytes_left); - MGMTD_FE_CLIENT_DBG( - "Got %d bytes of message from MGMTD Frontend server", - bytes_read); - /* -2 is normal nothing read, and to retry */ - if (bytes_read == -2) - break; - if (bytes_read <= 0) { - if (bytes_read == 0) { - MGMTD_FE_CLIENT_ERR( - "Got EOF/disconnect while reading from MGMTD Frontend server."); - } else { - /* Fatal error */ - MGMTD_FE_CLIENT_ERR( - "Got error (%d) while reading from MGMTD Frontend server. Err: '%s'", - bytes_read, safe_strerror(errno)); - } - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - - /* - * Check if we have read complete messages or not. - */ - stream_set_getp(client_ctx->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - msg_hdr = (struct mgmt_fe_msg_hdr - *)(STREAM_DATA(client_ctx->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_FE_CLIENT_ERR( - "Received corrupted buffer from MGMTD frontend server."); - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - if (msg_hdr->len > bytes_left) - break; - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } - - if (!msg_cnt) - goto resched; - - if (bytes_left > 0) - incomplete = true; - /* - * We have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = - (struct mgmt_fe_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) - + total_bytes); - stream_set_endp(client_ctx->ibuf_work, total_bytes); - stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); - client_ctx->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (incomplete) { - stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(client_ctx->ibuf_work, bytes_left); + rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_fe_client); + if (rv == MSR_DISCONNECT) { + mgmt_fe_server_disconnect(client_ctx, true); + return; } - - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); - -resched: + if (rv == MSR_SCHED_BOTH) + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); } -static int mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx) +static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx) { - int ret, sock, len; - struct sockaddr_un addr; - - MGMTD_FE_CLIENT_DBG( - "Trying to connect to MGMTD Frontend server at %s", - MGMTD_FE_SERVER_PATH); - - assert(!client_ctx->conn_fd); + const char *dbgtag = mgmt_debug_fe_client ? "FE-client" : NULL; - sock = socket(AF_UNIX, SOCK_STREAM, 0); - if (sock < 0) { - MGMTD_FE_CLIENT_ERR("Failed to create socket"); - goto mgmt_fe_server_connect_failed; - } + assert(client_ctx->conn_fd == -1); + client_ctx->conn_fd = mgmt_msg_connect( + MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE, + MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag); - MGMTD_FE_CLIENT_DBG( - "Created MGMTD Frontend server socket successfully!"); - - memset(&addr, 0, sizeof(struct sockaddr_un)); - addr.sun_family = AF_UNIX; - strlcpy(addr.sun_path, MGMTD_FE_SERVER_PATH, sizeof(addr.sun_path)); -#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN - len = addr.sun_len = SUN_LEN(&addr); -#else - len = sizeof(addr.sun_family) + strlen(addr.sun_path); -#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ - - ret = connect(sock, (struct sockaddr *)&addr, len); - if (ret < 0) { - MGMTD_FE_CLIENT_ERR( - "Failed to connect to MGMTD Frontend Server at %s. Err: %s", - addr.sun_path, safe_strerror(errno)); - close(sock); - goto mgmt_fe_server_connect_failed; + /* Send REGISTER_REQ message */ + if (client_ctx->conn_fd == -1 || + mgmt_fe_send_register_req(client_ctx) != 0) { + mgmt_fe_server_disconnect(client_ctx, true); + return; } - MGMTD_FE_CLIENT_DBG( - "Connected to MGMTD Frontend Server at %s successfully!", - addr.sun_path); - client_ctx->conn_fd = sock; - - /* Make client socket non-blocking. */ - set_nonblocking(sock); - setsockopt_so_sendbuf(client_ctx->conn_fd, - MGMTD_SOCKET_FE_SEND_BUF_SIZE); - setsockopt_so_recvbuf(client_ctx->conn_fd, - MGMTD_SOCKET_FE_RECV_BUF_SIZE); - - thread_add_read(client_ctx->tm, mgmt_fe_client_read, - (void *)&mgmt_fe_client_ctx, client_ctx->conn_fd, - &client_ctx->conn_read_ev); - assert(client_ctx->conn_read_ev); - - /* Send REGISTER_REQ message */ - if (mgmt_fe_send_register_req(client_ctx) != 0) - goto mgmt_fe_server_connect_failed; + /* Start reading from the socket */ + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); /* Notify client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, client_ctx->client_params.user_data, true); - - return 0; - -mgmt_fe_server_connect_failed: - if (sock && sock != client_ctx->conn_fd) - close(sock); - - mgmt_fe_server_disconnect(client_ctx, true); - return -1; } + static void mgmt_fe_client_conn_timeout(struct thread *thread) { - struct mgmt_fe_client_ctx *client_ctx; - - client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx); - - mgmt_fe_server_connect(client_ctx); + mgmt_fe_server_connect(THREAD_ARG(thread)); } static void @@ -1046,16 +795,9 @@ uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params, mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec = MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC; - assert(!mgmt_fe_client_ctx.ibuf_fifo - && !mgmt_fe_client_ctx.ibuf_work - && !mgmt_fe_client_ctx.obuf_fifo - && !mgmt_fe_client_ctx.obuf_work); - - mgmt_fe_client_ctx.ibuf_fifo = stream_fifo_new(); - mgmt_fe_client_ctx.ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - mgmt_fe_client_ctx.obuf_fifo = stream_fifo_new(); - - mgmt_fe_client_ctx.obuf_work = NULL; + mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC, + MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, + "FE-client"); mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions); @@ -1322,8 +1064,6 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl) mgmt_fe_server_disconnect(client_ctx, false); - assert(mgmt_fe_client_ctx.ibuf_fifo && mgmt_fe_client_ctx.obuf_fifo); - mgmt_fe_destroy_client_sessions(lib_hndl); THREAD_OFF(client_ctx->conn_retry_tmr); @@ -1331,10 +1071,5 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl) THREAD_OFF(client_ctx->conn_write_ev); THREAD_OFF(client_ctx->conn_writes_on); THREAD_OFF(client_ctx->msg_proc_ev); - stream_fifo_free(mgmt_fe_client_ctx.ibuf_fifo); - if (mgmt_fe_client_ctx.ibuf_work) - stream_free(mgmt_fe_client_ctx.ibuf_work); - stream_fifo_free(mgmt_fe_client_ctx.obuf_fifo); - if (mgmt_fe_client_ctx.obuf_work) - stream_free(mgmt_fe_client_ctx.obuf_work); + mgmt_msg_destroy(&client_ctx->mstate); } diff --git a/lib/mgmt_fe_client.h b/lib/mgmt_fe_client.h index 4ebecca215..ac29b8f27c 100644 --- a/lib/mgmt_fe_client.h +++ b/lib/mgmt_fe_client.h @@ -12,8 +12,9 @@ extern "C" { #endif -#include "mgmtd/mgmt_defines.h" #include "mgmt_pb.h" +#include "thread.h" +#include "mgmtd/mgmt_defines.h" /*************************************************************** * Macros @@ -55,18 +56,6 @@ extern "C" { #define MGMTD_DS_OPERATIONAL MGMTD__DATASTORE_ID__OPERATIONAL_DS #define MGMTD_DS_MAX_ID MGMTD_DS_OPERATIONAL + 1 -struct mgmt_fe_msg_hdr { - uint16_t marker; - uint16_t len; /* Includes header */ -}; -#define MGMTD_FE_MSG_HDR_LEN sizeof(struct mgmt_fe_msg_hdr) -#define MGMTD_FE_MSG_MARKER 0xdeaf - -struct mgmt_fe_msg { - struct mgmt_fe_msg_hdr hdr; - uint8_t payload[]; -}; - /* * All the client specific information this library needs to * initialize itself, setup connection with MGMTD FrontEnd interface diff --git a/mgmtd/mgmt_be_adapter.c b/mgmtd/mgmt_be_adapter.c index 8ad4064298..c57fa081a9 100644 --- a/mgmtd/mgmt_be_adapter.c +++ b/mgmtd/mgmt_be_adapter.c @@ -11,6 +11,7 @@ #include "sockopt.h" #include "network.h" #include "libfrr.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "mgmtd/mgmt.h" #include "mgmtd/mgmt_memory.h" @@ -497,8 +498,7 @@ mgmt_be_adapter_writes_on(struct mgmt_be_client_adapter *adapter) { MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name); UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF); - if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo)) - mgmt_be_adapter_sched_msg_write(adapter); + mgmt_be_adapter_sched_msg_write(adapter); } static inline void @@ -509,40 +509,20 @@ mgmt_be_adapter_writes_off(struct mgmt_be_client_adapter *adapter) } static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter, - Mgmtd__BeMessage *be_msg) + Mgmtd__BeMessage *be_msg) { - size_t msg_size; - uint8_t *msg_buf = adapter->msg_buf; - struct mgmt_be_msg *msg; - - if (adapter->conn_fd < 0) - return -1; - - msg_size = mgmtd__be_message__get_packed_size(be_msg); - msg_size += MGMTD_BE_MSG_HDR_LEN; - if (msg_size > MGMTD_BE_MSG_MAX_LEN) { - MGMTD_BE_ADAPTER_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + if (adapter->conn_fd == -1) { + MGMTD_BE_ADAPTER_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_be_msg *)msg_buf; - msg->hdr.marker = MGMTD_BE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__be_message__pack(be_msg, msg->payload); - - if (!adapter->obuf_work) - adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - } - stream_write(adapter->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &adapter->mstate, be_msg, + mgmtd__be_message__get_packed_size(be_msg), + (size_t(*)(void *, void *))mgmtd__be_message__pack, + mgmt_debug_be); mgmt_be_adapter_sched_msg_write(adapter); - adapter->num_msg_tx++; - return 0; + return rv; } static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter, @@ -614,239 +594,67 @@ static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter, return mgmt_be_adapter_send_msg(adapter, &be_msg); } -static uint16_t -mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter, - uint8_t *msg_buf, uint16_t bytes_read) +static void mgmt_be_adapter_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_be_client_adapter *adapter = user_ctx; Mgmtd__BeMessage *be_msg; - struct mgmt_be_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_be_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { - MGMTD_BE_ADAPTER_DBG( - "Marker not found in message from MGMTD Backend adapter '%s'", - adapter->name); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_BE_ADAPTER_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'", - bytes_left, msg->hdr.len, adapter->name); - break; - } - be_msg = mgmtd__be_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), - msg->payload); - if (!be_msg) { - MGMTD_BE_ADAPTER_DBG( - "Failed to decode %d bytes from MGMTD Backend adapter '%s'", - msg->hdr.len, adapter->name); - continue; - } - - (void)mgmt_be_adapter_handle_msg(adapter, be_msg); - mgmtd__be_message__free_unpacked(be_msg, NULL); - processed++; - adapter->num_msg_rx++; + be_msg = mgmtd__be_message__unpack(NULL, len, data); + if (!be_msg) { + MGMTD_BE_ADAPTER_DBG( + "Failed to decode %zu bytes for adapter: %s", len, + adapter->name); + return; } - - return processed; + MGMTD_BE_ADAPTER_DBG("Decoded %zu bytes of message: %u for adapter: %s", + len, be_msg->message_case, adapter->name); + (void)mgmt_be_adapter_handle_msg(adapter, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); } static void mgmt_be_adapter_proc_msgbufs(struct thread *thread) { - struct mgmt_be_client_adapter *adapter; - struct stream *work; - int processed = 0; - - adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); - assert(adapter); - - if (adapter->conn_fd < 0) - return; - - for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(adapter->ibuf_fifo); - if (!work) - break; - - processed += mgmt_be_adapter_process_msg( - adapter, STREAM_DATA(work), stream_get_endp(work)); - - if (work != adapter->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } + struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread); - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(adapter->ibuf_fifo)) + if (mgmt_msg_procbufs(&adapter->mstate, mgmt_be_adapter_process_msg, + adapter, mgmt_debug_be)) mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); } static void mgmt_be_adapter_read(struct thread *thread) { struct mgmt_be_client_adapter *adapter; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_be_msg_hdr *msg_hdr; - bool incomplete = false; + enum mgmt_msg_rsched rv; adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd >= 0); - total_bytes = 0; - bytes_left = STREAM_SIZE(adapter->ibuf_work) - - stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(adapter->ibuf_work, - adapter->conn_fd, bytes_left); - MGMTD_BE_ADAPTER_DBG( - "Got %d bytes of message from MGMTD Backend adapter '%s'", - bytes_read, adapter->name); - if (bytes_read <= 0) { - if (bytes_read == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_be_adapter_register_event( - adapter, MGMTD_BE_CONN_READ); - return; - } - - if (!bytes_read) { - /* Looks like connection closed */ - MGMTD_BE_ADAPTER_ERR( - "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'", - bytes_read, adapter->name, - safe_strerror(errno)); - mgmt_be_adapter_disconnect(adapter); - return; - } - break; - } - - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - - /* - * Check if we would have read incomplete messages or not. - */ - stream_set_getp(adapter->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - msg_hdr = - (struct mgmt_be_msg_hdr *)(STREAM_DATA( - adapter->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_BE_ADAPTER_ERR( - "Received corrupted buffer from MGMTD Backend client."); - mgmt_be_adapter_disconnect(adapter); - return; - } - if (msg_hdr->len > bytes_left) - break; - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } - - if (bytes_left > 0) - incomplete = true; - - /* - * We would have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work) - + total_bytes); - stream_set_endp(adapter->ibuf_work, total_bytes); - stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work); - adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (incomplete) { - stream_put(adapter->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(adapter->ibuf_work, bytes_left); + rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_be); + if (rv == MSR_DISCONNECT) { + mgmt_be_adapter_disconnect(adapter); + return; } - - if (msg_cnt) + if (rv == MSR_SCHED_BOTH) mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); - mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); } static void mgmt_be_adapter_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; - struct mgmt_be_client_adapter *adapter; + struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread); + enum mgmt_msg_wsched rv; - adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd >= 0); - - /* Ensure pushing any pending write buffer to FIFO */ - if (adapter->obuf_work) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = NULL; - } - - for (s = stream_fifo_head(adapter->obuf_fifo); - s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(adapter->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, adapter->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_be_adapter_register_event(adapter, - MGMTD_BE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_BE_ADAPTER_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_be_adapter_register_event( - adapter, MGMTD_BE_CONN_WRITE); - return; - } - mgmt_be_adapter_disconnect(adapter); - return; - } - - free = stream_fifo_pop(adapter->obuf_fifo); - stream_free(free); - MGMTD_BE_ADAPTER_DBG( - "Wrote %d bytes of message to MGMTD Backend client socket.'", - bytes_written); - processed++; - } - - if (s) { + rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_be); + if (rv == MSW_SCHED_STREAM) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_be_adapter_disconnect(adapter); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_be_adapter_writes_off(adapter); mgmt_be_adapter_register_event(adapter, - MGMTD_BE_CONN_WRITES_ON); - } + MGMTD_BE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_be_adapter_resume_writes(struct thread *thread) @@ -936,6 +744,14 @@ mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter, assert(adapter->conn_read_ev); break; case MGMTD_BE_CONN_WRITE: + if (adapter->conn_write_ev) + MGMTD_BE_ADAPTER_DBG( + "write ready notify already set for client %s", + adapter->name); + else + MGMTD_BE_ADAPTER_DBG( + "scheduling write ready notify for client %s", + adapter->name); thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write, adapter, adapter->conn_fd, &adapter->conn_write_ev); assert(adapter->conn_write_ev); @@ -976,17 +792,12 @@ extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter) (*adapter)->refcount--; if (!(*adapter)->refcount) { mgmt_be_adapters_del(&mgmt_be_adapters, *adapter); - - stream_fifo_free((*adapter)->ibuf_fifo); - stream_free((*adapter)->ibuf_work); - stream_fifo_free((*adapter)->obuf_fifo); - stream_free((*adapter)->obuf_work); - THREAD_OFF((*adapter)->conn_init_ev); THREAD_OFF((*adapter)->conn_read_ev); THREAD_OFF((*adapter)->conn_write_ev); THREAD_OFF((*adapter)->conn_writes_on); THREAD_OFF((*adapter)->proc_msg_ev); + mgmt_msg_destroy(&(*adapter)->mstate); XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter); } @@ -1029,11 +840,9 @@ mgmt_be_create_adapter(int conn_fd, union sockunion *from) memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su)); snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d", adapter->conn_fd); - adapter->ibuf_fifo = stream_fifo_new(); - adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - adapter->obuf_fifo = stream_fifo_new(); - /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */ - adapter->obuf_work = NULL; + mgmt_msg_init(&adapter->mstate, MGMTD_BE_MAX_NUM_MSG_PROC, + MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, + "BE-adapter"); mgmt_be_adapter_lock(adapter); mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); @@ -1195,8 +1004,14 @@ void mgmt_be_adapter_status_write(struct vty *vty) vty_out(vty, " Conn-FD: \t\t\t%d\n", adapter->conn_fd); vty_out(vty, " Client-Id: \t\t\t%d\n", adapter->id); vty_out(vty, " Ref-Count: \t\t\t%u\n", adapter->refcount); - vty_out(vty, " Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx); - vty_out(vty, " Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx); + vty_out(vty, " Msg-Recvd: \t\t\t%" PRIu64 "\n", + adapter->mstate.nrxm); + vty_out(vty, " Bytes-Recvd: \t\t%" PRIu64 "\n", + adapter->mstate.nrxb); + vty_out(vty, " Msg-Sent: \t\t\t%" PRIu64 "\n", + adapter->mstate.ntxm); + vty_out(vty, " Bytes-Sent: \t\t%" PRIu64 "\n", + adapter->mstate.ntxb); } vty_out(vty, " Total: %d\n", (int)mgmt_be_adapters_count(&mgmt_be_adapters)); diff --git a/mgmtd/mgmt_be_adapter.h b/mgmtd/mgmt_be_adapter.h index 5dfc2386da..7f57233d35 100644 --- a/mgmtd/mgmt_be_adapter.h +++ b/mgmtd/mgmt_be_adapter.h @@ -9,8 +9,9 @@ #ifndef _FRR_MGMTD_BE_ADAPTER_H_ #define _FRR_MGMTD_BE_ADAPTER_H_ -#include "mgmtd/mgmt_defines.h" #include "mgmt_be_client.h" +#include "mgmt_msg.h" +#include "mgmtd/mgmt_defines.h" #include "mgmtd/mgmt_ds.h" #define MGMTD_BE_CONN_INIT_DELAY_MSEC 50 @@ -54,22 +55,9 @@ struct mgmt_be_client_adapter { char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN]; /* IO streams for read and write */ - /* pthread_mutex_t ibuf_mtx; */ - struct stream_fifo *ibuf_fifo; - /* pthread_mutex_t obuf_mtx; */ - struct stream_fifo *obuf_fifo; - - /* Private I/O buffers */ - struct stream *ibuf_work; - struct stream *obuf_work; - uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; - - /* Buffer of data waiting to be written to client. */ - /* struct buffer *wb; */ + struct mgmt_msg_state mstate; int refcount; - uint32_t num_msg_tx; - uint32_t num_msg_rx; /* * List of config items that should be sent to the diff --git a/mgmtd/mgmt_be_server.c b/mgmtd/mgmt_be_server.c index 6464b12ae3..6997fdcf81 100644 --- a/mgmtd/mgmt_be_server.c +++ b/mgmtd/mgmt_be_server.c @@ -28,7 +28,7 @@ zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) #endif /* REDIRECT_DEBUG_TO_STDERR */ -static int mgmt_be_listen_fd; +static int mgmt_be_listen_fd = -1; static struct thread_master *mgmt_be_listen_tm; static struct thread *mgmt_be_listen_ev; static void mgmt_be_server_register_event(enum mgmt_be_event event); diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c index 1ea812c1a7..cc812ab150 100644 --- a/mgmtd/mgmt_fe_adapter.c +++ b/mgmtd/mgmt_fe_adapter.c @@ -11,6 +11,7 @@ #include "network.h" #include "libfrr.h" #include "mgmt_fe_client.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "hash.h" #include "jhash.h" @@ -375,8 +376,7 @@ mgmt_fe_adapter_writes_on(struct mgmt_fe_client_adapter *adapter) { MGMTD_FE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name); UNSET_FLAG(adapter->flags, MGMTD_FE_ADAPTER_FLAGS_WRITES_OFF); - if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo)) - mgmt_fe_adapter_sched_msg_write(adapter); + mgmt_fe_adapter_sched_msg_write(adapter); } static inline void @@ -390,40 +390,18 @@ static int mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter, Mgmtd__FeMessage *fe_msg) { - size_t msg_size; - uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN]; - struct mgmt_fe_msg *msg; - - if (adapter->conn_fd < 0) { - MGMTD_FE_ADAPTER_ERR("Connection already reset"); - return -1; - } - - msg_size = mgmtd__fe_message__get_packed_size(fe_msg); - msg_size += MGMTD_FE_MSG_HDR_LEN; - if (msg_size > sizeof(msg_buf)) { - MGMTD_FE_ADAPTER_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)sizeof(msg_buf)); + if (adapter->conn_fd == -1) { + MGMTD_FE_ADAPTER_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_fe_msg *)msg_buf; - msg->hdr.marker = MGMTD_FE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__fe_message__pack(fe_msg, msg->payload); - - if (!adapter->obuf_work) - adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - } - stream_write(adapter->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &adapter->mstate, fe_msg, + mgmtd__fe_message__get_packed_size(fe_msg), + (size_t(*)(void *, void *))mgmtd__fe_message__pack, + mgmt_debug_fe); mgmt_fe_adapter_sched_msg_write(adapter); - adapter->num_msg_tx++; - return 0; + return rv; } static int @@ -1244,6 +1222,9 @@ static int mgmt_fe_session_handle_commit_config_req_msg( "Failed to create a Configuration session!"); return 0; } + MGMTD_FE_ADAPTER_DBG( + "Created txn %llu for session %llu for COMMIT-CFG-REQ", + session->cfg_txn_id, session->session_id); } @@ -1430,252 +1411,66 @@ mgmt_fe_adapter_handle_msg(struct mgmt_fe_client_adapter *adapter, return 0; } -static uint16_t -mgmt_fe_adapter_process_msg(struct mgmt_fe_client_adapter *adapter, - uint8_t *msg_buf, uint16_t bytes_read) +static void mgmt_fe_adapter_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_fe_client_adapter *adapter = user_ctx; Mgmtd__FeMessage *fe_msg; - struct mgmt_fe_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - - MGMTD_FE_ADAPTER_DBG( - "Have %u bytes of messages from client '%s' to process", - bytes_read, adapter->name); - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_fe_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) { - MGMTD_FE_ADAPTER_DBG( - "Marker not found in message from MGMTD Frontend adapter '%s'", - adapter->name); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_FE_ADAPTER_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend adapter '%s'", - bytes_left, msg->hdr.len, adapter->name); - break; - } - - fe_msg = mgmtd__fe_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN), - msg->payload); - if (!fe_msg) { - MGMTD_FE_ADAPTER_DBG( - "Failed to decode %d bytes from MGMTD Frontend adapter '%s'", - msg->hdr.len, adapter->name); - continue; - } + fe_msg = mgmtd__fe_message__unpack(NULL, len, data); + if (!fe_msg) { MGMTD_FE_ADAPTER_DBG( - "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend adapter '%s'", - msg->hdr.len, fe_msg->message_case, - fe_msg->message_case, adapter->name); - - (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg); - - mgmtd__fe_message__free_unpacked(fe_msg, NULL); - processed++; - adapter->num_msg_rx++; + "Failed to decode %zu bytes for adapter: %s", len, + adapter->name); + return; } - - return processed; + MGMTD_FE_ADAPTER_DBG( + "Decoded %zu bytes of message: %u from adapter: %s", len, + fe_msg->message_case, adapter->name); + (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg); + mgmtd__fe_message__free_unpacked(fe_msg, NULL); } static void mgmt_fe_adapter_proc_msgbufs(struct thread *thread) { - struct mgmt_fe_client_adapter *adapter; - struct stream *work; - int processed = 0; + struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread); - adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd >= 0); - - MGMTD_FE_ADAPTER_DBG("Have %d ibufs for client '%s' to process", - (int)stream_fifo_count_safe(adapter->ibuf_fifo), - adapter->name); - - for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(adapter->ibuf_fifo); - if (!work) - break; - - processed += mgmt_fe_adapter_process_msg( - adapter, STREAM_DATA(work), stream_get_endp(work)); - - if (work != adapter->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } - - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(adapter->ibuf_fifo)) + if (mgmt_msg_procbufs(&adapter->mstate, mgmt_fe_adapter_process_msg, + adapter, mgmt_debug_fe)) mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG); } static void mgmt_fe_adapter_read(struct thread *thread) { - struct mgmt_fe_client_adapter *adapter; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_fe_msg_hdr *msg_hdr; - bool incomplete = false; + struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread); + enum mgmt_msg_rsched rv; - adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd); - - total_bytes = 0; - bytes_left = STREAM_SIZE(adapter->ibuf_work) - - stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(adapter->ibuf_work, adapter->conn_fd, - bytes_left); - MGMTD_FE_ADAPTER_DBG( - "Got %d bytes of message from MGMTD Frontend adapter '%s'", - bytes_read, adapter->name); - if (bytes_read <= 0) { - if (bytes_read == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_fe_adapter_register_event( - adapter, MGMTD_FE_CONN_READ); - return; - } - - if (!bytes_read) { - /* Looks like connection closed */ - MGMTD_FE_ADAPTER_ERR( - "Got error (%d) while reading from MGMTD Frontend adapter '%s'. Err: '%s'", - bytes_read, adapter->name, - safe_strerror(errno)); - mgmt_fe_adapter_disconnect(adapter); - return; - } - break; - } - - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - - /* - * Check if we would have read incomplete messages or not. - */ - stream_set_getp(adapter->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - msg_hdr = - (struct mgmt_fe_msg_hdr *)(STREAM_DATA( - adapter->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_FE_ADAPTER_ERR( - "Received corrupted buffer from MGMTD frontend client."); - mgmt_fe_adapter_disconnect(adapter); - return; - } - if (msg_hdr->len > bytes_left) - break; - - MGMTD_FE_ADAPTER_DBG("Got message (len: %u) from client '%s'", - msg_hdr->len, adapter->name); - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } - - if (bytes_left > 0) - incomplete = true; - /* - * We would have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = (struct mgmt_fe_msg_hdr *)(STREAM_DATA(adapter->ibuf_work) - + total_bytes); - stream_set_endp(adapter->ibuf_work, total_bytes); - stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work); - adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (incomplete) { - stream_put(adapter->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(adapter->ibuf_work, bytes_left); + rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe); + if (rv == MSR_DISCONNECT) { + mgmt_fe_adapter_disconnect(adapter); + return; } - - if (msg_cnt) + if (rv == MSR_SCHED_BOTH) mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG); - mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ); } static void mgmt_fe_adapter_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; - struct mgmt_fe_client_adapter *adapter; - - adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd); - - /* Ensure pushing any pending write buffer to FIFO */ - if (adapter->obuf_work) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = NULL; - } - - for (s = stream_fifo_head(adapter->obuf_fifo); - s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(adapter->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, adapter->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_fe_adapter_register_event( - adapter, MGMTD_FE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_FE_ADAPTER_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Frontend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_fe_adapter_register_event( - adapter, MGMTD_FE_CONN_WRITE); - return; - } - mgmt_fe_adapter_disconnect(adapter); - return; - } - - free = stream_fifo_pop(adapter->obuf_fifo); - stream_free(free); - MGMTD_FE_ADAPTER_DBG( - "Wrote %d bytes of message to MGMTD Frontend client socket.'", - bytes_written); - processed++; - } + struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread); + enum mgmt_msg_wsched rv; - if (s) { + rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe); + if (rv == MSW_SCHED_STREAM) + mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_fe_adapter_disconnect(adapter); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_fe_adapter_writes_off(adapter); mgmt_fe_adapter_register_event(adapter, - MGMTD_FE_CONN_WRITES_ON); - } + MGMTD_FE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_fe_adapter_resume_writes(struct thread *thread) @@ -1683,7 +1478,7 @@ static void mgmt_fe_adapter_resume_writes(struct thread *thread) struct mgmt_fe_client_adapter *adapter; adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd); + assert(adapter && adapter->conn_fd != -1); mgmt_fe_adapter_writes_on(adapter); } @@ -1739,16 +1534,11 @@ mgmt_fe_adapter_unlock(struct mgmt_fe_client_adapter **adapter) (*adapter)->refcount--; if (!(*adapter)->refcount) { mgmt_fe_adapters_del(&mgmt_fe_adapters, *adapter); - - stream_fifo_free((*adapter)->ibuf_fifo); - stream_free((*adapter)->ibuf_work); - stream_fifo_free((*adapter)->obuf_fifo); - stream_free((*adapter)->obuf_work); - THREAD_OFF((*adapter)->conn_read_ev); THREAD_OFF((*adapter)->conn_write_ev); THREAD_OFF((*adapter)->proc_msg_ev); THREAD_OFF((*adapter)->conn_writes_on); + mgmt_msg_destroy(&(*adapter)->mstate); XFREE(MTYPE_MGMTD_FE_ADPATER, *adapter); } @@ -1793,11 +1583,10 @@ mgmt_fe_create_adapter(int conn_fd, union sockunion *from) snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d", adapter->conn_fd); mgmt_fe_sessions_init(&adapter->fe_sessions); - adapter->ibuf_fifo = stream_fifo_new(); - adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - adapter->obuf_fifo = stream_fifo_new(); - /* adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); */ - adapter->obuf_work = NULL; + + mgmt_msg_init(&adapter->mstate, MGMTD_FE_MAX_NUM_MSG_PROC, + MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, + "FE-adapter"); mgmt_fe_adapter_lock(adapter); mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ); @@ -2083,9 +1872,14 @@ void mgmt_fe_adapter_status_write(struct vty *vty, bool detail) } vty_out(vty, " Total-Sessions: \t\t\t%d\n", (int)mgmt_fe_sessions_count(&adapter->fe_sessions)); - vty_out(vty, " Msg-Sent: \t\t\t\t%u\n", adapter->num_msg_tx); - vty_out(vty, " Msg-Recvd: \t\t\t\t%u\n", - adapter->num_msg_rx); + vty_out(vty, " Msg-Recvd: \t\t\t\t%" PRIu64 "\n", + adapter->mstate.nrxm); + vty_out(vty, " Bytes-Recvd: \t\t\t%" PRIu64 "\n", + adapter->mstate.nrxb); + vty_out(vty, " Msg-Sent: \t\t\t\t%" PRIu64 "\n", + adapter->mstate.ntxm); + vty_out(vty, " Bytes-Sent: \t\t\t%" PRIu64 "\n", + adapter->mstate.ntxb); } vty_out(vty, " Total: %d\n", (int)mgmt_fe_adapters_count(&mgmt_fe_adapters)); diff --git a/mgmtd/mgmt_fe_adapter.h b/mgmtd/mgmt_fe_adapter.h index 05d37d3f36..3389234a3f 100644 --- a/mgmtd/mgmt_fe_adapter.h +++ b/mgmtd/mgmt_fe_adapter.h @@ -9,6 +9,10 @@ #ifndef _FRR_MGMTD_FE_ADAPTER_H_ #define _FRR_MGMTD_FE_ADAPTER_H_ +#include "mgmt_fe_client.h" +#include "mgmt_msg.h" +#include "mgmtd/mgmt_defines.h" + struct mgmt_fe_client_adapter; struct mgmt_master; @@ -64,18 +68,9 @@ struct mgmt_fe_client_adapter { struct mgmt_fe_sessions_head fe_sessions; /* IO streams for read and write */ - /* pthread_mutex_t ibuf_mtx; */ - struct stream_fifo *ibuf_fifo; - /* pthread_mutex_t obuf_mtx; */ - struct stream_fifo *obuf_fifo; - - /* Private I/O buffers */ - struct stream *ibuf_work; - struct stream *obuf_work; + struct mgmt_msg_state mstate; int refcount; - uint32_t num_msg_tx; - uint32_t num_msg_rx; struct mgmt_commit_stats cmt_stats; struct mgmt_setcfg_stats setcfg_stats; diff --git a/mgmtd/mgmt_fe_server.c b/mgmtd/mgmt_fe_server.c index 2db4397cce..0b0a56ea65 100644 --- a/mgmtd/mgmt_fe_server.c +++ b/mgmtd/mgmt_fe_server.c @@ -28,7 +28,7 @@ zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) #endif /* REDIRECT_DEBUG_TO_STDERR */ -static int mgmt_fe_listen_fd; +static int mgmt_fe_listen_fd = -1; static struct thread_master *mgmt_fe_listen_tm; static struct thread *mgmt_fe_listen_ev; static void mgmt_fe_server_register_event(enum mgmt_fe_event event); diff --git a/mgmtd/mgmt_txn.c b/mgmtd/mgmt_txn.c index 115aa532c4..05b593798e 100644 --- a/mgmtd/mgmt_txn.c +++ b/mgmtd/mgmt_txn.c @@ -2471,6 +2471,8 @@ int mgmt_txn_notify_be_adapter_conn(struct mgmt_be_client_adapter *adapter, return -1; } + MGMTD_TXN_DBG("Created initial txn %llu for BE connection %s", + txn->txn_id, adapter->name); /* * Set the changeset for transaction to commit and trigger the * commit request. @@ -2608,53 +2610,7 @@ int mgmt_txn_notify_be_cfgdata_reply( return 0; } -int mgmt_txn_notify_be_cfg_validate_reply( - uint64_t txn_id, bool success, uint64_t batch_ids[], - size_t num_batch_ids, char *error_if_any, - struct mgmt_be_client_adapter *adapter) -{ - struct mgmt_txn_ctx *txn; - struct mgmt_txn_be_cfg_batch *cfg_btch; - struct mgmt_commit_cfg_req *cmtcfg_req = NULL; - size_t indx; - - txn = mgmt_txn_id2ctx(txn_id); - if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG) - return -1; - - assert(txn->commit_cfg_req); - cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; - - if (!success) { - MGMTD_TXN_ERR( - "CFGDATA_VALIDATE_REQ sent to '%s' failed for Txn %p, Batches [0x%llx - 0x%llx], Err: %s", - adapter->name, txn, (unsigned long long)batch_ids[0], - (unsigned long long)batch_ids[num_batch_ids - 1], - error_if_any ? error_if_any : "None"); - mgmt_txn_send_commit_cfg_reply( - txn, MGMTD_INTERNAL_ERROR, - "Internal error! Failed to validate config data on backend!"); - return 0; - } - - for (indx = 0; indx < num_batch_ids; indx++) { - cfg_btch = mgmt_txn_cfgbatch_id2ctx(txn, batch_ids[indx]); - if (cfg_btch->txn != txn) - return -1; - mgmt_move_txn_cfg_batch_to_next( - cmtcfg_req, cfg_btch, - &cmtcfg_req->curr_batches[adapter->id], - &cmtcfg_req->next_batches[adapter->id], true, - MGMTD_COMMIT_PHASE_APPLY_CFG); - } - - mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req); - - return 0; -} - -extern int -mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success, +int mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success, uint64_t batch_ids[], size_t num_batch_ids, char *error_if_any, struct mgmt_be_client_adapter *adapter) @@ -2852,6 +2808,8 @@ int mgmt_txn_rollback_trigger_cfg_apply(struct mgmt_ds_ctx *src_ds_ctx, return -1; } + MGMTD_TXN_DBG("Created rollback txn %llu", txn->txn_id); + /* * Set the changeset for transaction to commit and trigger the commit * request. -- 2.39.5