diff options
Diffstat (limited to 'lib/mgmt_be_client.c')
| -rw-r--r-- | lib/mgmt_be_client.c | 436 |
1 files changed, 85 insertions, 351 deletions
diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c index 1e98c6123d..4dc7b296b8 100644 --- a/lib/mgmt_be_client.c +++ b/lib/mgmt_be_client.c @@ -9,6 +9,7 @@ #include "libfrr.h" #include "mgmtd/mgmt.h" #include "mgmt_be_client.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "network.h" #include "stream.h" @@ -111,14 +112,8 @@ struct mgmt_be_client_ctx { struct thread *conn_writes_on; struct thread *msg_proc_ev; uint32_t flags; - uint32_t num_msg_tx; - uint32_t num_msg_rx; - struct stream_fifo *ibuf_fifo; - struct stream *ibuf_work; - struct stream_fifo *obuf_fifo; - struct stream *obuf_work; - uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + struct mgmt_msg_state mstate; struct nb_config *candidate_config; struct nb_config *running_config; @@ -143,7 +138,9 @@ struct mgmt_be_client_ctx { static bool mgmt_debug_be_client; -static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0}; +static struct mgmt_be_client_ctx mgmt_be_client_ctx = { + .conn_fd = -1, +}; const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { #ifdef HAVE_STATICD @@ -168,14 +165,13 @@ mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx, { /* Notify client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) - (void)(*client_ctx->client_params - .client_connect_notify)( + (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, client_ctx->client_params.user_data, false); - if (client_ctx->conn_fd) { + if (client_ctx->conn_fd != -1) { close(client_ctx->conn_fd); - client_ctx->conn_fd = 0; + client_ctx->conn_fd = -1; } if (reconnect) @@ -881,181 +877,47 @@ mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx, return 0; } -static int -mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx, - uint8_t *msg_buf, int bytes_read) +static void mgmt_be_client_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_be_client_ctx *client_ctx = user_ctx; Mgmtd__BeMessage *be_msg; - struct mgmt_be_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - - MGMTD_BE_CLIENT_DBG( - "Got message of %d bytes from MGMTD Backend Server", - bytes_read); - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_be_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { - MGMTD_BE_CLIENT_DBG( - "Marker not found in message from MGMTD '%s'", - client_ctx->client_params.name); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_BE_CLIENT_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'", - bytes_left, msg->hdr.len, - client_ctx->client_params.name); - break; - } - - be_msg = mgmtd__be_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), - msg->payload); - if (!be_msg) { - MGMTD_BE_CLIENT_DBG( - "Failed to decode %d bytes from MGMTD '%s'", - msg->hdr.len, client_ctx->client_params.name); - continue; - } - (void)mgmt_be_client_handle_msg(client_ctx, be_msg); - mgmtd__be_message__free_unpacked(be_msg, NULL); - processed++; - client_ctx->num_msg_rx++; + be_msg = mgmtd__be_message__unpack(NULL, len, data); + if (!be_msg) { + MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server", + len); + return; } - - return processed; + MGMTD_BE_CLIENT_DBG( + "Decoded %zu bytes of message(msg: %u/%u) from server", len, + be_msg->message_case, be_msg->message_case); + (void)mgmt_be_client_handle_msg(client_ctx, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); } static void mgmt_be_client_proc_msgbufs(struct thread *thread) { - struct mgmt_be_client_ctx *client_ctx; - struct stream *work; - int processed = 0; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx); - - if (client_ctx->conn_fd == 0) - return; - - for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); - if (!work) - break; - - processed += mgmt_be_client_process_msg( - client_ctx, STREAM_DATA(work), stream_get_endp(work)); - - if (work != client_ctx->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } + struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread); - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(client_ctx->ibuf_fifo)) - mgmt_be_client_register_event(client_ctx, - MGMTD_BE_PROC_MSG); + if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg, + client_ctx, mgmt_debug_be_client)) + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); } static void mgmt_be_client_read(struct thread *thread) { - struct mgmt_be_client_ctx *client_ctx; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_be_msg_hdr *msg_hdr; - bool incomplete = false; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - total_bytes = 0; - bytes_left = STREAM_SIZE(client_ctx->ibuf_work) - - stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(client_ctx->ibuf_work, - client_ctx->conn_fd, bytes_left); - MGMTD_BE_CLIENT_DBG( - "Got %d bytes of message from MGMTD Backend server", - bytes_read); - /* -2 is normal nothing read, and to retry */ - if (bytes_read == -2) - break; - if (bytes_read <= 0) { - if (bytes_read == 0) { - MGMTD_BE_CLIENT_ERR( - "Got EOF/disconnect while reading from MGMTD Frontend server."); - } else { - MGMTD_BE_CLIENT_ERR( - "Got error (%d) while reading from MGMTD Backend server. Err: '%s'", - bytes_read, safe_strerror(errno)); - } - mgmt_be_server_disconnect(client_ctx, true); - return; - } - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - /* - * Check if we have read complete messages or not. - */ - stream_set_getp(client_ctx->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(client_ctx->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - msg_hdr = (struct mgmt_be_msg_hdr - *)(STREAM_DATA(client_ctx->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_BE_CLIENT_ERR( - "Received corrupted buffer from MGMTD backend server."); - mgmt_be_server_disconnect(client_ctx, true); - return; - } - if (msg_hdr->len > bytes_left) - break; - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } + struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread); + enum mgmt_msg_rsched rv; - if (!msg_cnt) - goto resched; - - if (bytes_left > 0) - incomplete = true; - - /* - * We have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = - (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) + - total_bytes); - stream_set_endp(client_ctx->ibuf_work, total_bytes); - stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); - client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (incomplete) { - stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(client_ctx->ibuf_work, bytes_left); + rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_be_client); + if (rv == MSR_DISCONNECT) { + mgmt_be_server_disconnect(client_ctx, true); + return; } - mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); - -resched: + if (rv == MSR_SCHED_BOTH) + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); } @@ -1072,9 +934,7 @@ mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx) { MGMTD_BE_CLIENT_DBG("Resume writing msgs"); UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); - if (client_ctx->obuf_work - || stream_fifo_count_safe(client_ctx->obuf_fifo)) - mgmt_be_client_sched_msg_write(client_ctx); + mgmt_be_client_sched_msg_write(client_ctx); } static inline void @@ -1085,99 +945,39 @@ mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx) } static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, - Mgmtd__BeMessage *be_msg) + Mgmtd__BeMessage *be_msg) { - size_t msg_size; - uint8_t *msg_buf = client_ctx->msg_buf; - struct mgmt_be_msg *msg; - - if (client_ctx->conn_fd == 0) - return -1; - - msg_size = mgmtd__be_message__get_packed_size(be_msg); - msg_size += MGMTD_BE_MSG_HDR_LEN; - if (msg_size > MGMTD_BE_MSG_MAX_LEN) { - MGMTD_BE_CLIENT_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + if (client_ctx->conn_fd == -1) { + MGMTD_BE_CLIENT_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_be_msg *)msg_buf; - msg->hdr.marker = MGMTD_BE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__be_message__pack(be_msg, msg->payload); - - if (!client_ctx->obuf_work) - client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - } - stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &client_ctx->mstate, be_msg, + mgmtd__be_message__get_packed_size(be_msg), + (size_t(*)(void *, void *))mgmtd__be_message__pack, + mgmt_debug_be_client); mgmt_be_client_sched_msg_write(client_ctx); - client_ctx->num_msg_tx++; - return 0; + return rv; } static void mgmt_be_client_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; - struct mgmt_be_client_ctx *client_ctx; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); - - /* Ensure pushing any pending write buffer to FIFO */ - if (client_ctx->obuf_work) { - stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); - client_ctx->obuf_work = NULL; - } - - for (s = stream_fifo_head(client_ctx->obuf_fifo); - s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(client_ctx->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, client_ctx->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_be_client_register_event( - client_ctx, MGMTD_BE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_BE_CLIENT_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_be_client_register_event( - client_ctx, MGMTD_BE_CONN_WRITE); - return; - } - mgmt_be_server_disconnect(client_ctx, true); - return; - } - - free = stream_fifo_pop(client_ctx->obuf_fifo); - stream_free(free); - MGMTD_BE_CLIENT_DBG( - "Wrote %d bytes of message to MGMTD Backend client socket.'", - bytes_written); - processed++; - } - - if (s) { + struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread); + enum mgmt_msg_wsched rv; + + rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd, + mgmt_debug_be_client); + if (rv == MSW_SCHED_STREAM) + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_be_server_disconnect(client_ctx, true); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_be_client_writes_off(client_ctx); mgmt_be_client_register_event(client_ctx, - MGMTD_BE_CONN_WRITES_ON); - } + MGMTD_BE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_be_client_resume_writes(struct thread *thread) @@ -1185,15 +985,14 @@ static void mgmt_be_client_resume_writes(struct thread *thread) struct mgmt_be_client_ctx *client_ctx; client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx && client_ctx->conn_fd); + assert(client_ctx && client_ctx->conn_fd != -1); mgmt_be_client_writes_on(client_ctx); } static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, - bool subscr_xpaths, - uint16_t num_reg_xpaths, - char **reg_xpaths) + bool subscr_xpaths, uint16_t num_reg_xpaths, + char **reg_xpaths) { Mgmtd__BeMessage be_msg; Mgmtd__BeSubscribeReq subscr_req; @@ -1214,86 +1013,35 @@ static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, return mgmt_be_client_send_msg(client_ctx, &be_msg); } -static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) +static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) { - int ret, sock, len; - struct sockaddr_un addr; - - MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s", - MGMTD_BE_SERVER_PATH); - - assert(!client_ctx->conn_fd); + const char *dbgtag = mgmt_debug_be_client ? "BE-client" : NULL; - sock = socket(AF_UNIX, SOCK_STREAM, 0); - if (sock < 0) { - MGMTD_BE_CLIENT_ERR("Failed to create socket"); - goto mgmt_be_server_connect_failed; - } - - MGMTD_BE_CLIENT_DBG( - "Created MGMTD Backend server socket successfully!"); + assert(client_ctx->conn_fd == -1); + client_ctx->conn_fd = mgmt_msg_connect( + MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE, + MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag); - memset(&addr, 0, sizeof(struct sockaddr_un)); - addr.sun_family = AF_UNIX; - strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); -#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN - len = addr.sun_len = SUN_LEN(&addr); -#else - len = sizeof(addr.sun_family) + strlen(addr.sun_path); -#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ - - ret = connect(sock, (struct sockaddr *)&addr, len); - if (ret < 0) { - MGMTD_BE_CLIENT_ERR( - "Failed to connect to MGMTD Backend Server at %s. Err: %s", - addr.sun_path, safe_strerror(errno)); - close(sock); - goto mgmt_be_server_connect_failed; + /* Send SUBSCRIBE_REQ message */ + if (client_ctx->conn_fd == -1 || + mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) { + mgmt_be_server_disconnect(client_ctx, true); + return; } - MGMTD_BE_CLIENT_DBG( - "Connected to MGMTD Backend Server at %s successfully!", - addr.sun_path); - client_ctx->conn_fd = sock; - - /* Make client socket non-blocking. */ - set_nonblocking(sock); - setsockopt_so_sendbuf(client_ctx->conn_fd, - MGMTD_SOCKET_BE_SEND_BUF_SIZE); - setsockopt_so_recvbuf(client_ctx->conn_fd, - MGMTD_SOCKET_BE_RECV_BUF_SIZE); - + /* Start reading from the socket */ mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); /* Notify client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) - (void)(*client_ctx->client_params - .client_connect_notify)( + (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, client_ctx->client_params.user_data, true); - - /* Send SUBSCRIBE_REQ message */ - if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) - goto mgmt_be_server_connect_failed; - - return 0; - -mgmt_be_server_connect_failed: - if (sock && sock != client_ctx->conn_fd) - close(sock); - - mgmt_be_server_disconnect(client_ctx, true); - return -1; } static void mgmt_be_client_conn_timeout(struct thread *thread) { - struct mgmt_be_client_ctx *client_ctx; - - client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); - assert(client_ctx); - - mgmt_be_server_connect(client_ctx); + mgmt_be_server_connect(THREAD_ARG(thread)); } static void @@ -1317,16 +1065,15 @@ mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, break; case MGMTD_BE_PROC_MSG: tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; - thread_add_timer_tv(client_ctx->tm, - mgmt_be_client_proc_msgbufs, client_ctx, - &tv, &client_ctx->msg_proc_ev); + thread_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs, + client_ctx, &tv, &client_ctx->msg_proc_ev); assert(client_ctx->msg_proc_ev); break; case MGMTD_BE_CONN_WRITES_ON: - thread_add_timer_msec( - client_ctx->tm, mgmt_be_client_resume_writes, - client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC, - &client_ctx->conn_writes_on); + thread_add_timer_msec(client_ctx->tm, + mgmt_be_client_resume_writes, client_ctx, + MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &client_ctx->conn_writes_on); assert(client_ctx->conn_writes_on); break; case MGMTD_BE_SERVER: @@ -1376,16 +1123,10 @@ uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, mgmt_be_client_ctx.client_params.conn_retry_intvl_sec = MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC; - assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work && - !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work); - mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head); - mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new(); - mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - mgmt_be_client_ctx.obuf_fifo = stream_fifo_new(); - /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - */ - mgmt_be_client_ctx.obuf_work = NULL; + mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC, + MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, + "BE-client"); /* Start trying to connect to MGMTD backend server immediately */ mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1); @@ -1465,18 +1206,11 @@ void mgmt_be_client_lib_destroy(uintptr_t lib_hndl) assert(client_ctx); MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'", - client_ctx->client_params.name); + client_ctx->client_params.name); mgmt_be_server_disconnect(client_ctx, false); - assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo); - - stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo); - if (mgmt_be_client_ctx.ibuf_work) - stream_free(mgmt_be_client_ctx.ibuf_work); - stream_fifo_free(mgmt_be_client_ctx.obuf_fifo); - if (mgmt_be_client_ctx.obuf_work) - stream_free(mgmt_be_client_ctx.obuf_work); + mgmt_msg_destroy(&client_ctx->mstate); THREAD_OFF(client_ctx->conn_retry_tmr); THREAD_OFF(client_ctx->conn_read_ev); |
