diff options
Diffstat (limited to 'lib/mgmt_fe_client.c')
| -rw-r--r-- | lib/mgmt_fe_client.c | 413 |
1 files changed, 74 insertions, 339 deletions
diff --git a/lib/mgmt_fe_client.c b/lib/mgmt_fe_client.c index 22c3a06b9e..73154401ec 100644 --- a/lib/mgmt_fe_client.c +++ b/lib/mgmt_fe_client.c @@ -9,6 +9,7 @@ #include "memory.h" #include "libfrr.h" #include "mgmt_fe_client.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "network.h" #include "stream.h" @@ -55,13 +56,8 @@ struct mgmt_fe_client_ctx { struct thread *conn_writes_on; struct thread *msg_proc_ev; uint32_t flags; - uint32_t num_msg_tx; - uint32_t num_msg_rx; - struct stream_fifo *ibuf_fifo; - struct stream *ibuf_work; - struct stream_fifo *obuf_fifo; - struct stream *obuf_work; + struct mgmt_msg_state mstate; struct mgmt_fe_client_params client_params; @@ -75,7 +71,9 @@ struct mgmt_fe_client_ctx { static bool mgmt_debug_fe_client; -static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {0}; +static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = { + .conn_fd = -1, +}; /* Forward declarations */ static void @@ -124,9 +122,9 @@ static void mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx, bool reconnect) { - if (client_ctx->conn_fd) { + if (client_ctx->conn_fd != -1) { close(client_ctx->conn_fd); - client_ctx->conn_fd = 0; + client_ctx->conn_fd = -1; } if (reconnect) @@ -148,9 +146,7 @@ mgmt_fe_client_writes_on(struct mgmt_fe_client_ctx *client_ctx) { MGMTD_FE_CLIENT_DBG("Resume writing msgs"); UNSET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF); - if (client_ctx->obuf_work - || stream_fifo_count_safe(client_ctx->obuf_fifo)) - mgmt_fe_client_sched_msg_write(client_ctx); + mgmt_fe_client_sched_msg_write(client_ctx); } static inline void @@ -160,101 +156,42 @@ mgmt_fe_client_writes_off(struct mgmt_fe_client_ctx *client_ctx) MGMTD_FE_CLIENT_DBG("Paused writing msgs"); } -static int -mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, - Mgmtd__FeMessage *fe_msg) +static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, + Mgmtd__FeMessage *fe_msg) { - size_t msg_size; - uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN]; - struct mgmt_fe_msg *msg; - - if (client_ctx->conn_fd == 0) - return -1; - - msg_size = mgmtd__fe_message__get_packed_size(fe_msg); - msg_size += MGMTD_FE_MSG_HDR_LEN; - if (msg_size > sizeof(msg_buf)) { - MGMTD_FE_CLIENT_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)sizeof(msg_buf)); + /* users current expect this to fail here */ + if (client_ctx->conn_fd == -1) { + MGMTD_FE_CLIENT_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_fe_msg *)msg_buf; - msg->hdr.marker = MGMTD_FE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__fe_message__pack(fe_msg, msg->payload); - - if (!client_ctx->obuf_work) - client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - } - stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &client_ctx->mstate, fe_msg, + mgmtd__fe_message__get_packed_size(fe_msg), + (size_t(*)(void *, void *))mgmtd__fe_message__pack, + mgmt_debug_fe_client); mgmt_fe_client_sched_msg_write(client_ctx); - client_ctx->num_msg_tx++; - return 0; + return rv; } static void mgmt_fe_client_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; struct mgmt_fe_client_ctx *client_ctx; + enum mgmt_msg_wsched rv; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - /* Ensure pushing any pending write buffer to FIFO */ - if (client_ctx->obuf_work) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = NULL; - } - - for (s = stream_fifo_head(client_ctx->obuf_fifo); - s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(client_ctx->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, client_ctx->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_fe_client_register_event( - client_ctx, MGMTD_FE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_FE_CLIENT_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_fe_client_register_event( - client_ctx, MGMTD_FE_CONN_WRITE); - return; - } - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - - free = stream_fifo_pop(client_ctx->obuf_fifo); - stream_free(free); - MGMTD_FE_CLIENT_DBG( - "Wrote %d bytes of message to MGMTD Backend client socket.'", - bytes_written); - processed++; - } - - if (s) { + rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_fe_client); + if (rv == MSW_SCHED_STREAM) + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_fe_server_disconnect(client_ctx, true); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_fe_client_writes_off(client_ctx); mgmt_fe_client_register_event(client_ctx, - MGMTD_FE_CONN_WRITES_ON); - } + MGMTD_FE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_fe_client_resume_writes(struct thread *thread) @@ -262,7 +199,7 @@ static void mgmt_fe_client_resume_writes(struct thread *thread) struct mgmt_fe_client_ctx *client_ctx; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); + assert(client_ctx && client_ctx->conn_fd != -1); mgmt_fe_client_writes_on(client_ctx); } @@ -713,271 +650,83 @@ mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx, return 0; } -static int -mgmt_fe_client_process_msg(struct mgmt_fe_client_ctx *client_ctx, - uint8_t *msg_buf, int bytes_read) +static void mgmt_fe_client_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_fe_client_ctx *client_ctx = user_ctx; Mgmtd__FeMessage *fe_msg; - struct mgmt_fe_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - MGMTD_FE_CLIENT_DBG( - "Have %u bytes of messages from MGMTD Frontend server to .", - bytes_read); - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_fe_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) { - MGMTD_FE_CLIENT_DBG( - "Marker not found in message from MGMTD Frontend server."); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_FE_CLIENT_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend server.", - bytes_left, msg->hdr.len); - break; - } - - fe_msg = mgmtd__fe_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN), - msg->payload); - if (!fe_msg) { - MGMTD_FE_CLIENT_DBG( - "Failed to decode %d bytes from MGMTD Frontend server.", - msg->hdr.len); - continue; - } - - MGMTD_FE_CLIENT_DBG( - "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend server", - msg->hdr.len, fe_msg->message_case, - fe_msg->message_case); - - (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg); - - mgmtd__fe_message__free_unpacked(fe_msg, NULL); - processed++; - client_ctx->num_msg_rx++; + fe_msg = mgmtd__fe_message__unpack(NULL, len, data); + if (!fe_msg) { + MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.", + len); + return; } - - return processed; + MGMTD_FE_CLIENT_DBG( + "Decoded %zu bytes of message(msg: %u/%u) from server", len, + fe_msg->message_case, fe_msg->message_case); + (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg); + mgmtd__fe_message__free_unpacked(fe_msg, NULL); } static void mgmt_fe_client_proc_msgbufs(struct thread *thread) { struct mgmt_fe_client_ctx *client_ctx; - struct stream *work; - int processed = 0; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); - if (!work) - break; - - processed += mgmt_fe_client_process_msg( - client_ctx, STREAM_DATA(work), stream_get_endp(work)); - - if (work != client_ctx->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } - - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(client_ctx->ibuf_fifo)) - mgmt_fe_client_register_event(client_ctx, - MGMTD_FE_PROC_MSG); + if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg, + client_ctx, mgmt_debug_fe_client)) + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); } static void mgmt_fe_client_read(struct thread *thread) { struct mgmt_fe_client_ctx *client_ctx; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_fe_msg_hdr *msg_hdr; - bool incomplete = false; + enum mgmt_msg_rsched rv; client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - total_bytes = 0; - bytes_left = STREAM_SIZE(client_ctx->ibuf_work) - - stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(client_ctx->ibuf_work, - client_ctx->conn_fd, bytes_left); - MGMTD_FE_CLIENT_DBG( - "Got %d bytes of message from MGMTD Frontend server", - bytes_read); - /* -2 is normal nothing read, and to retry */ - if (bytes_read == -2) - break; - if (bytes_read <= 0) { - if (bytes_read == 0) { - MGMTD_FE_CLIENT_ERR( - "Got EOF/disconnect while reading from MGMTD Frontend server."); - } else { - /* Fatal error */ - MGMTD_FE_CLIENT_ERR( - "Got error (%d) while reading from MGMTD Frontend server. Err: '%s'", - bytes_read, safe_strerror(errno)); - } - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - - /* - * Check if we have read complete messages or not. - */ - stream_set_getp(client_ctx->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - msg_hdr = (struct mgmt_fe_msg_hdr - *)(STREAM_DATA(client_ctx->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_FE_CLIENT_ERR( - "Received corrupted buffer from MGMTD frontend server."); - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - if (msg_hdr->len > bytes_left) - break; - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } - - if (!msg_cnt) - goto resched; - - if (bytes_left > 0) - incomplete = true; - /* - * We have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = - (struct mgmt_fe_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) - + total_bytes); - stream_set_endp(client_ctx->ibuf_work, total_bytes); - stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); - client_ctx->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (incomplete) { - stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(client_ctx->ibuf_work, bytes_left); + rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_fe_client); + if (rv == MSR_DISCONNECT) { + mgmt_fe_server_disconnect(client_ctx, true); + return; } - - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); - -resched: + if (rv == MSR_SCHED_BOTH) + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); } -static int mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx) +static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx) { - int ret, sock, len; - struct sockaddr_un addr; - - MGMTD_FE_CLIENT_DBG( - "Trying to connect to MGMTD Frontend server at %s", - MGMTD_FE_SERVER_PATH); - - assert(!client_ctx->conn_fd); + const char *dbgtag = mgmt_debug_fe_client ? "FE-client" : NULL; - sock = socket(AF_UNIX, SOCK_STREAM, 0); - if (sock < 0) { - MGMTD_FE_CLIENT_ERR("Failed to create socket"); - goto mgmt_fe_server_connect_failed; - } + assert(client_ctx->conn_fd == -1); + client_ctx->conn_fd = mgmt_msg_connect( + MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE, + MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag); - MGMTD_FE_CLIENT_DBG( - "Created MGMTD Frontend server socket successfully!"); - - memset(&addr, 0, sizeof(struct sockaddr_un)); - addr.sun_family = AF_UNIX; - strlcpy(addr.sun_path, MGMTD_FE_SERVER_PATH, sizeof(addr.sun_path)); -#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN - len = addr.sun_len = SUN_LEN(&addr); -#else - len = sizeof(addr.sun_family) + strlen(addr.sun_path); -#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ - - ret = connect(sock, (struct sockaddr *)&addr, len); - if (ret < 0) { - MGMTD_FE_CLIENT_ERR( - "Failed to connect to MGMTD Frontend Server at %s. Err: %s", - addr.sun_path, safe_strerror(errno)); - close(sock); - goto mgmt_fe_server_connect_failed; + /* Send REGISTER_REQ message */ + if (client_ctx->conn_fd == -1 || + mgmt_fe_send_register_req(client_ctx) != 0) { + mgmt_fe_server_disconnect(client_ctx, true); + return; } - MGMTD_FE_CLIENT_DBG( - "Connected to MGMTD Frontend Server at %s successfully!", - addr.sun_path); - client_ctx->conn_fd = sock; - - /* Make client socket non-blocking. */ - set_nonblocking(sock); - setsockopt_so_sendbuf(client_ctx->conn_fd, - MGMTD_SOCKET_FE_SEND_BUF_SIZE); - setsockopt_so_recvbuf(client_ctx->conn_fd, - MGMTD_SOCKET_FE_RECV_BUF_SIZE); - - thread_add_read(client_ctx->tm, mgmt_fe_client_read, - (void *)&mgmt_fe_client_ctx, client_ctx->conn_fd, - &client_ctx->conn_read_ev); - assert(client_ctx->conn_read_ev); - - /* Send REGISTER_REQ message */ - if (mgmt_fe_send_register_req(client_ctx) != 0) - goto mgmt_fe_server_connect_failed; + /* Start reading from the socket */ + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); /* Notify client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, client_ctx->client_params.user_data, true); - - return 0; - -mgmt_fe_server_connect_failed: - if (sock && sock != client_ctx->conn_fd) - close(sock); - - mgmt_fe_server_disconnect(client_ctx, true); - return -1; } + static void mgmt_fe_client_conn_timeout(struct thread *thread) { - struct mgmt_fe_client_ctx *client_ctx; - - client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); - assert(client_ctx); - - mgmt_fe_server_connect(client_ctx); + mgmt_fe_server_connect(THREAD_ARG(thread)); } static void @@ -1046,16 +795,9 @@ uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params, mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec = MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC; - assert(!mgmt_fe_client_ctx.ibuf_fifo - && !mgmt_fe_client_ctx.ibuf_work - && !mgmt_fe_client_ctx.obuf_fifo - && !mgmt_fe_client_ctx.obuf_work); - - mgmt_fe_client_ctx.ibuf_fifo = stream_fifo_new(); - mgmt_fe_client_ctx.ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - mgmt_fe_client_ctx.obuf_fifo = stream_fifo_new(); - - mgmt_fe_client_ctx.obuf_work = NULL; + mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC, + MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, + "FE-client"); mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions); @@ -1322,8 +1064,6 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl) mgmt_fe_server_disconnect(client_ctx, false); - assert(mgmt_fe_client_ctx.ibuf_fifo && mgmt_fe_client_ctx.obuf_fifo); - mgmt_fe_destroy_client_sessions(lib_hndl); THREAD_OFF(client_ctx->conn_retry_tmr); @@ -1331,10 +1071,5 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl) THREAD_OFF(client_ctx->conn_write_ev); THREAD_OFF(client_ctx->conn_writes_on); THREAD_OFF(client_ctx->msg_proc_ev); - stream_fifo_free(mgmt_fe_client_ctx.ibuf_fifo); - if (mgmt_fe_client_ctx.ibuf_work) - stream_free(mgmt_fe_client_ctx.ibuf_work); - stream_fifo_free(mgmt_fe_client_ctx.obuf_fifo); - if (mgmt_fe_client_ctx.obuf_work) - stream_free(mgmt_fe_client_ctx.obuf_work); + mgmt_msg_destroy(&client_ctx->mstate); } |
