]> git.puffer.fish Git - matthieu/frr.git/commitdiff
mgmtd: lib: utilize msglib constructed from the removed code
authorChristian Hopps <chopps@labn.net>
Wed, 8 Mar 2023 22:11:43 +0000 (17:11 -0500)
committerChristian Hopps <chopps@gmail.com>
Wed, 22 Mar 2023 05:22:56 +0000 (05:22 +0000)
Signed-off-by: Christian Hopps <chopps@labn.net>
lib/mgmt_be_client.c
lib/mgmt_be_client.h
lib/mgmt_fe_client.c
lib/mgmt_fe_client.h
mgmtd/mgmt_be_adapter.c
mgmtd/mgmt_be_adapter.h
mgmtd/mgmt_be_server.c
mgmtd/mgmt_fe_adapter.c
mgmtd/mgmt_fe_adapter.h
mgmtd/mgmt_fe_server.c
mgmtd/mgmt_txn.c

index 1e98c6123dd1d182bd35fb6cee736385078dd16a..4dc7b296b80596eae87039ab96c94c72eabe6756 100644 (file)
@@ -9,6 +9,7 @@
 #include "libfrr.h"
 #include "mgmtd/mgmt.h"
 #include "mgmt_be_client.h"
+#include "mgmt_msg.h"
 #include "mgmt_pb.h"
 #include "network.h"
 #include "stream.h"
@@ -111,14 +112,8 @@ struct mgmt_be_client_ctx {
        struct thread *conn_writes_on;
        struct thread *msg_proc_ev;
        uint32_t flags;
-       uint32_t num_msg_tx;
-       uint32_t num_msg_rx;
 
-       struct stream_fifo *ibuf_fifo;
-       struct stream *ibuf_work;
-       struct stream_fifo *obuf_fifo;
-       struct stream *obuf_work;
-       uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN];
+       struct mgmt_msg_state mstate;
 
        struct nb_config *candidate_config;
        struct nb_config *running_config;
@@ -143,7 +138,9 @@ struct mgmt_be_client_ctx {
 
 static bool mgmt_debug_be_client;
 
-static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0};
+static struct mgmt_be_client_ctx mgmt_be_client_ctx = {
+       .conn_fd = -1,
+};
 
 const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = {
 #ifdef HAVE_STATICD
@@ -168,14 +165,13 @@ mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx,
 {
        /* Notify client through registered callback (if any) */
        if (client_ctx->client_params.client_connect_notify)
-               (void)(*client_ctx->client_params
-                               .client_connect_notify)(
+               (void)(*client_ctx->client_params.client_connect_notify)(
                        (uintptr_t)client_ctx,
                        client_ctx->client_params.user_data, false);
 
-       if (client_ctx->conn_fd) {
+       if (client_ctx->conn_fd != -1) {
                close(client_ctx->conn_fd);
-               client_ctx->conn_fd = 0;
+               client_ctx->conn_fd = -1;
        }
 
        if (reconnect)
@@ -881,181 +877,47 @@ mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx,
        return 0;
 }
 
-static int
-mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx,
-                             uint8_t *msg_buf, int bytes_read)
+static void mgmt_be_client_process_msg(void *user_ctx, uint8_t *data,
+                                      size_t len)
 {
+       struct mgmt_be_client_ctx *client_ctx = user_ctx;
        Mgmtd__BeMessage *be_msg;
-       struct mgmt_be_msg *msg;
-       uint16_t bytes_left;
-       uint16_t processed = 0;
-
-       MGMTD_BE_CLIENT_DBG(
-               "Got message of %d bytes from MGMTD Backend Server",
-               bytes_read);
-
-       bytes_left = bytes_read;
-       for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;
-            bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
-               msg = (struct mgmt_be_msg *)msg_buf;
-               if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) {
-                       MGMTD_BE_CLIENT_DBG(
-                               "Marker not found in message from MGMTD '%s'",
-                               client_ctx->client_params.name);
-                       break;
-               }
-
-               if (bytes_left < msg->hdr.len) {
-                       MGMTD_BE_CLIENT_DBG(
-                               "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'",
-                               bytes_left, msg->hdr.len,
-                               client_ctx->client_params.name);
-                       break;
-               }
-
-               be_msg = mgmtd__be_message__unpack(
-                       NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN),
-                       msg->payload);
-               if (!be_msg) {
-                       MGMTD_BE_CLIENT_DBG(
-                               "Failed to decode %d bytes from MGMTD '%s'",
-                               msg->hdr.len, client_ctx->client_params.name);
-                       continue;
-               }
 
-               (void)mgmt_be_client_handle_msg(client_ctx, be_msg);
-               mgmtd__be_message__free_unpacked(be_msg, NULL);
-               processed++;
-               client_ctx->num_msg_rx++;
+       be_msg = mgmtd__be_message__unpack(NULL, len, data);
+       if (!be_msg) {
+               MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server",
+                                   len);
+               return;
        }
-
-       return processed;
+       MGMTD_BE_CLIENT_DBG(
+               "Decoded %zu bytes of message(msg: %u/%u) from server", len,
+               be_msg->message_case, be_msg->message_case);
+       (void)mgmt_be_client_handle_msg(client_ctx, be_msg);
+       mgmtd__be_message__free_unpacked(be_msg, NULL);
 }
 
 static void mgmt_be_client_proc_msgbufs(struct thread *thread)
 {
-       struct mgmt_be_client_ctx *client_ctx;
-       struct stream *work;
-       int processed = 0;
-
-       client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx);
-
-       if (client_ctx->conn_fd == 0)
-               return;
-
-       for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) {
-               work = stream_fifo_pop_safe(client_ctx->ibuf_fifo);
-               if (!work)
-                       break;
-
-               processed += mgmt_be_client_process_msg(
-                       client_ctx, STREAM_DATA(work), stream_get_endp(work));
-
-               if (work != client_ctx->ibuf_work) {
-                       /* Free it up */
-                       stream_free(work);
-               } else {
-                       /* Reset stream buffer for next read */
-                       stream_reset(work);
-               }
-       }
+       struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
 
-       /*
-        * If we have more to process, reschedule for processing it.
-        */
-       if (stream_fifo_head(client_ctx->ibuf_fifo))
-               mgmt_be_client_register_event(client_ctx,
-                                                MGMTD_BE_PROC_MSG);
+       if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg,
+                             client_ctx, mgmt_debug_be_client))
+               mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
 }
 
 static void mgmt_be_client_read(struct thread *thread)
 {
-       struct mgmt_be_client_ctx *client_ctx;
-       int bytes_read, msg_cnt;
-       size_t total_bytes, bytes_left;
-       struct mgmt_be_msg_hdr *msg_hdr;
-       bool incomplete = false;
-
-       client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
-
-       total_bytes = 0;
-       bytes_left = STREAM_SIZE(client_ctx->ibuf_work)
-                    - stream_get_endp(client_ctx->ibuf_work);
-       for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
-               bytes_read = stream_read_try(client_ctx->ibuf_work,
-                                            client_ctx->conn_fd, bytes_left);
-               MGMTD_BE_CLIENT_DBG(
-                       "Got %d bytes of message from MGMTD Backend server",
-                       bytes_read);
-               /* -2 is normal nothing read, and to retry */
-               if (bytes_read == -2)
-                       break;
-               if (bytes_read <= 0) {
-                       if (bytes_read == 0) {
-                               MGMTD_BE_CLIENT_ERR(
-                                       "Got EOF/disconnect while reading from MGMTD Frontend server.");
-                       } else {
-                               MGMTD_BE_CLIENT_ERR(
-                                       "Got error (%d) while reading from MGMTD Backend server. Err: '%s'",
-                                       bytes_read, safe_strerror(errno));
-                       }
-                       mgmt_be_server_disconnect(client_ctx, true);
-                       return;
-               }
-               total_bytes += bytes_read;
-               bytes_left -= bytes_read;
-       }
-       /*
-        * Check if we have read complete messages or not.
-        */
-       stream_set_getp(client_ctx->ibuf_work, 0);
-       total_bytes = 0;
-       msg_cnt = 0;
-       bytes_left = stream_get_endp(client_ctx->ibuf_work);
-       for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
-               msg_hdr = (struct mgmt_be_msg_hdr
-                                  *)(STREAM_DATA(client_ctx->ibuf_work)
-                                     + total_bytes);
-               if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) {
-                       /* Corrupted buffer. Force disconnect?? */
-                       MGMTD_BE_CLIENT_ERR(
-                               "Received corrupted buffer from MGMTD backend server.");
-                       mgmt_be_server_disconnect(client_ctx, true);
-                       return;
-               }
-               if (msg_hdr->len > bytes_left)
-                       break;
-
-               total_bytes += msg_hdr->len;
-               bytes_left -= msg_hdr->len;
-               msg_cnt++;
-       }
+       struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
+       enum mgmt_msg_rsched rv;
 
-       if (!msg_cnt)
-               goto resched;
-
-       if (bytes_left > 0)
-               incomplete = true;
-
-       /*
-        * We have read one or several messages.
-        * Schedule processing them now.
-        */
-       msg_hdr =
-               (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) +
-                                          total_bytes);
-       stream_set_endp(client_ctx->ibuf_work, total_bytes);
-       stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work);
-       client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       if (incomplete) {
-               stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left);
-               stream_set_endp(client_ctx->ibuf_work, bytes_left);
+       rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
+                          mgmt_debug_be_client);
+       if (rv == MSR_DISCONNECT) {
+               mgmt_be_server_disconnect(client_ctx, true);
+               return;
        }
-       mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
-
-resched:
+       if (rv == MSR_SCHED_BOTH)
+               mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
        mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
 }
 
@@ -1072,9 +934,7 @@ mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx)
 {
        MGMTD_BE_CLIENT_DBG("Resume writing msgs");
        UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF);
-       if (client_ctx->obuf_work
-           || stream_fifo_count_safe(client_ctx->obuf_fifo))
-               mgmt_be_client_sched_msg_write(client_ctx);
+       mgmt_be_client_sched_msg_write(client_ctx);
 }
 
 static inline void
@@ -1085,99 +945,39 @@ mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx)
 }
 
 static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx,
-                                     Mgmtd__BeMessage *be_msg)
+                                  Mgmtd__BeMessage *be_msg)
 {
-       size_t msg_size;
-       uint8_t *msg_buf = client_ctx->msg_buf;
-       struct mgmt_be_msg *msg;
-
-       if (client_ctx->conn_fd == 0)
-               return -1;
-
-       msg_size = mgmtd__be_message__get_packed_size(be_msg);
-       msg_size += MGMTD_BE_MSG_HDR_LEN;
-       if (msg_size > MGMTD_BE_MSG_MAX_LEN) {
-               MGMTD_BE_CLIENT_ERR(
-                       "Message size %d more than max size'%d. Not sending!'",
-                       (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN);
+       if (client_ctx->conn_fd == -1) {
+               MGMTD_BE_CLIENT_DBG("can't send message on closed connection");
                return -1;
        }
 
-       msg = (struct mgmt_be_msg *)msg_buf;
-       msg->hdr.marker = MGMTD_BE_MSG_MARKER;
-       msg->hdr.len = (uint16_t)msg_size;
-       mgmtd__be_message__pack(be_msg, msg->payload);
-
-       if (!client_ctx->obuf_work)
-               client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) {
-               stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
-               client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       }
-       stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size);
-
+       int rv = mgmt_msg_send_msg(
+               &client_ctx->mstate, be_msg,
+               mgmtd__be_message__get_packed_size(be_msg),
+               (size_t(*)(void *, void *))mgmtd__be_message__pack,
+               mgmt_debug_be_client);
        mgmt_be_client_sched_msg_write(client_ctx);
-       client_ctx->num_msg_tx++;
-       return 0;
+       return rv;
 }
 
 static void mgmt_be_client_write(struct thread *thread)
 {
-       int bytes_written = 0;
-       int processed = 0;
-       int msg_size = 0;
-       struct stream *s = NULL;
-       struct stream *free = NULL;
-       struct mgmt_be_client_ctx *client_ctx;
-
-       client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
-
-       /* Ensure pushing any pending write buffer to FIFO */
-       if (client_ctx->obuf_work) {
-               stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
-               client_ctx->obuf_work = NULL;
-       }
-
-       for (s = stream_fifo_head(client_ctx->obuf_fifo);
-            s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE;
-            s = stream_fifo_head(client_ctx->obuf_fifo)) {
-               /* msg_size = (int)stream_get_size(s); */
-               msg_size = (int)STREAM_READABLE(s);
-               bytes_written = stream_flush(s, client_ctx->conn_fd);
-               if (bytes_written == -1
-                   && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                       mgmt_be_client_register_event(
-                               client_ctx, MGMTD_BE_CONN_WRITE);
-                       return;
-               } else if (bytes_written != msg_size) {
-                       MGMTD_BE_CLIENT_ERR(
-                               "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
-                               msg_size, bytes_written, safe_strerror(errno));
-                       if (bytes_written > 0) {
-                               stream_forward_getp(s, (size_t)bytes_written);
-                               stream_pulldown(s);
-                               mgmt_be_client_register_event(
-                                       client_ctx, MGMTD_BE_CONN_WRITE);
-                               return;
-                       }
-                       mgmt_be_server_disconnect(client_ctx, true);
-                       return;
-               }
-
-               free = stream_fifo_pop(client_ctx->obuf_fifo);
-               stream_free(free);
-               MGMTD_BE_CLIENT_DBG(
-                       "Wrote %d bytes of message to MGMTD Backend client socket.'",
-                       bytes_written);
-               processed++;
-       }
-
-       if (s) {
+       struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
+       enum mgmt_msg_wsched rv;
+
+       rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
+                           mgmt_debug_be_client);
+       if (rv == MSW_SCHED_STREAM)
+               mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE);
+       else if (rv == MSW_DISCONNECT)
+               mgmt_be_server_disconnect(client_ctx, true);
+       else if (rv == MSW_SCHED_WRITES_OFF) {
                mgmt_be_client_writes_off(client_ctx);
                mgmt_be_client_register_event(client_ctx,
-                                                MGMTD_BE_CONN_WRITES_ON);
-       }
+                                             MGMTD_BE_CONN_WRITES_ON);
+       } else
+               assert(rv == MSW_SCHED_NONE);
 }
 
 static void mgmt_be_client_resume_writes(struct thread *thread)
@@ -1185,15 +985,14 @@ static void mgmt_be_client_resume_writes(struct thread *thread)
        struct mgmt_be_client_ctx *client_ctx;
 
        client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
+       assert(client_ctx && client_ctx->conn_fd != -1);
 
        mgmt_be_client_writes_on(client_ctx);
 }
 
 static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
-                                     bool subscr_xpaths,
-                                     uint16_t num_reg_xpaths,
-                                     char **reg_xpaths)
+                                  bool subscr_xpaths, uint16_t num_reg_xpaths,
+                                  char **reg_xpaths)
 {
        Mgmtd__BeMessage be_msg;
        Mgmtd__BeSubscribeReq subscr_req;
@@ -1214,86 +1013,35 @@ static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
        return mgmt_be_client_send_msg(client_ctx, &be_msg);
 }
 
-static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
+static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
 {
-       int ret, sock, len;
-       struct sockaddr_un addr;
-
-       MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s",
-                            MGMTD_BE_SERVER_PATH);
-
-       assert(!client_ctx->conn_fd);
+       const char *dbgtag = mgmt_debug_be_client ? "BE-client" : NULL;
 
-       sock = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (sock < 0) {
-               MGMTD_BE_CLIENT_ERR("Failed to create socket");
-               goto mgmt_be_server_connect_failed;
-       }
-
-       MGMTD_BE_CLIENT_DBG(
-               "Created MGMTD Backend server socket successfully!");
+       assert(client_ctx->conn_fd == -1);
+       client_ctx->conn_fd = mgmt_msg_connect(
+               MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE,
+               MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag);
 
-       memset(&addr, 0, sizeof(struct sockaddr_un));
-       addr.sun_family = AF_UNIX;
-       strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path));
-#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
-       len = addr.sun_len = SUN_LEN(&addr);
-#else
-       len = sizeof(addr.sun_family) + strlen(addr.sun_path);
-#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
-
-       ret = connect(sock, (struct sockaddr *)&addr, len);
-       if (ret < 0) {
-               MGMTD_BE_CLIENT_ERR(
-                       "Failed to connect to MGMTD Backend Server at %s. Err: %s",
-                       addr.sun_path, safe_strerror(errno));
-               close(sock);
-               goto mgmt_be_server_connect_failed;
+       /* Send SUBSCRIBE_REQ message */
+       if (client_ctx->conn_fd == -1 ||
+           mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) {
+               mgmt_be_server_disconnect(client_ctx, true);
+               return;
        }
 
-       MGMTD_BE_CLIENT_DBG(
-               "Connected to MGMTD Backend Server at %s successfully!",
-               addr.sun_path);
-       client_ctx->conn_fd = sock;
-
-       /* Make client socket non-blocking.  */
-       set_nonblocking(sock);
-       setsockopt_so_sendbuf(client_ctx->conn_fd,
-                             MGMTD_SOCKET_BE_SEND_BUF_SIZE);
-       setsockopt_so_recvbuf(client_ctx->conn_fd,
-                             MGMTD_SOCKET_BE_RECV_BUF_SIZE);
-
+       /* Start reading from the socket */
        mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
 
        /* Notify client through registered callback (if any) */
        if (client_ctx->client_params.client_connect_notify)
-               (void)(*client_ctx->client_params
-                               .client_connect_notify)(
+               (void)(*client_ctx->client_params.client_connect_notify)(
                        (uintptr_t)client_ctx,
                        client_ctx->client_params.user_data, true);
-
-       /* Send SUBSCRIBE_REQ message */
-       if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0)
-               goto mgmt_be_server_connect_failed;
-
-       return 0;
-
-mgmt_be_server_connect_failed:
-       if (sock && sock != client_ctx->conn_fd)
-               close(sock);
-
-       mgmt_be_server_disconnect(client_ctx, true);
-       return -1;
 }
 
 static void mgmt_be_client_conn_timeout(struct thread *thread)
 {
-       struct mgmt_be_client_ctx *client_ctx;
-
-       client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx);
-
-       mgmt_be_server_connect(client_ctx);
+       mgmt_be_server_connect(THREAD_ARG(thread));
 }
 
 static void
@@ -1317,16 +1065,15 @@ mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx,
                break;
        case MGMTD_BE_PROC_MSG:
                tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC;
-               thread_add_timer_tv(client_ctx->tm,
-                                   mgmt_be_client_proc_msgbufs, client_ctx,
-                                   &tv, &client_ctx->msg_proc_ev);
+               thread_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs,
+                                   client_ctx, &tv, &client_ctx->msg_proc_ev);
                assert(client_ctx->msg_proc_ev);
                break;
        case MGMTD_BE_CONN_WRITES_ON:
-               thread_add_timer_msec(
-                       client_ctx->tm, mgmt_be_client_resume_writes,
-                       client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC,
-                       &client_ctx->conn_writes_on);
+               thread_add_timer_msec(client_ctx->tm,
+                                     mgmt_be_client_resume_writes, client_ctx,
+                                     MGMTD_BE_MSG_WRITE_DELAY_MSEC,
+                                     &client_ctx->conn_writes_on);
                assert(client_ctx->conn_writes_on);
                break;
        case MGMTD_BE_SERVER:
@@ -1376,16 +1123,10 @@ uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params,
                mgmt_be_client_ctx.client_params.conn_retry_intvl_sec =
                        MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC;
 
-       assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work &&
-              !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work);
-
        mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head);
-       mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new();
-       mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       mgmt_be_client_ctx.obuf_fifo = stream_fifo_new();
-       /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-        */
-       mgmt_be_client_ctx.obuf_work = NULL;
+       mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
+                     MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+                     "BE-client");
 
        /* Start trying to connect to MGMTD backend server immediately */
        mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1);
@@ -1465,18 +1206,11 @@ void mgmt_be_client_lib_destroy(uintptr_t lib_hndl)
        assert(client_ctx);
 
        MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'",
-                            client_ctx->client_params.name);
+                           client_ctx->client_params.name);
 
        mgmt_be_server_disconnect(client_ctx, false);
 
-       assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo);
-
-       stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo);
-       if (mgmt_be_client_ctx.ibuf_work)
-               stream_free(mgmt_be_client_ctx.ibuf_work);
-       stream_fifo_free(mgmt_be_client_ctx.obuf_fifo);
-       if (mgmt_be_client_ctx.obuf_work)
-               stream_free(mgmt_be_client_ctx.obuf_work);
+       mgmt_msg_destroy(&client_ctx->mstate);
 
        THREAD_OFF(client_ctx->conn_retry_tmr);
        THREAD_OFF(client_ctx->conn_read_ev);
index bf875f5a451a0dfb36139ff1c9248cf5cb094bf7..66bc62fb084f3871b354b4bb54acc20c3fa92f4e 100644 (file)
@@ -12,6 +12,8 @@
 extern "C" {
 #endif
 
+#include "northbound.h"
+#include "mgmt_pb.h"
 #include "mgmtd/mgmt_defines.h"
 
 /***************************************************************
@@ -80,18 +82,6 @@ enum mgmt_be_client_id {
 
 #define MGMTD_BE_MAX_CLIENTS_PER_XPATH_REG 32
 
-struct mgmt_be_msg_hdr {
-       uint16_t marker;
-       uint16_t len; /* Includes header */
-};
-#define MGMTD_BE_MSG_HDR_LEN sizeof(struct mgmt_be_msg_hdr)
-#define MGMTD_BE_MSG_MARKER 0xfeed
-
-struct mgmt_be_msg {
-       struct mgmt_be_msg_hdr hdr;
-       uint8_t payload[];
-};
-
 struct mgmt_be_client_txn_ctx {
        uintptr_t *user_ctx;
 };
index 22c3a06b9eb3829d720efcb9d845d9983adfcf5f..73154401ec8ca0447759161a20fbd38c5fcf77b1 100644 (file)
@@ -9,6 +9,7 @@
 #include "memory.h"
 #include "libfrr.h"
 #include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
 #include "mgmt_pb.h"
 #include "network.h"
 #include "stream.h"
@@ -55,13 +56,8 @@ struct mgmt_fe_client_ctx {
        struct thread *conn_writes_on;
        struct thread *msg_proc_ev;
        uint32_t flags;
-       uint32_t num_msg_tx;
-       uint32_t num_msg_rx;
 
-       struct stream_fifo *ibuf_fifo;
-       struct stream *ibuf_work;
-       struct stream_fifo *obuf_fifo;
-       struct stream *obuf_work;
+       struct mgmt_msg_state mstate;
 
        struct mgmt_fe_client_params client_params;
 
@@ -75,7 +71,9 @@ struct mgmt_fe_client_ctx {
 
 static bool mgmt_debug_fe_client;
 
-static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {0};
+static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {
+       .conn_fd = -1,
+};
 
 /* Forward declarations */
 static void
@@ -124,9 +122,9 @@ static void
 mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx,
                              bool reconnect)
 {
-       if (client_ctx->conn_fd) {
+       if (client_ctx->conn_fd != -1) {
                close(client_ctx->conn_fd);
-               client_ctx->conn_fd = 0;
+               client_ctx->conn_fd = -1;
        }
 
        if (reconnect)
@@ -148,9 +146,7 @@ mgmt_fe_client_writes_on(struct mgmt_fe_client_ctx *client_ctx)
 {
        MGMTD_FE_CLIENT_DBG("Resume writing msgs");
        UNSET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF);
-       if (client_ctx->obuf_work
-           || stream_fifo_count_safe(client_ctx->obuf_fifo))
-               mgmt_fe_client_sched_msg_write(client_ctx);
+       mgmt_fe_client_sched_msg_write(client_ctx);
 }
 
 static inline void
@@ -160,101 +156,42 @@ mgmt_fe_client_writes_off(struct mgmt_fe_client_ctx *client_ctx)
        MGMTD_FE_CLIENT_DBG("Paused writing msgs");
 }
 
-static int
-mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
-                           Mgmtd__FeMessage *fe_msg)
+static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
+                                  Mgmtd__FeMessage *fe_msg)
 {
-       size_t msg_size;
-       uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN];
-       struct mgmt_fe_msg *msg;
-
-       if (client_ctx->conn_fd == 0)
-               return -1;
-
-       msg_size = mgmtd__fe_message__get_packed_size(fe_msg);
-       msg_size += MGMTD_FE_MSG_HDR_LEN;
-       if (msg_size > sizeof(msg_buf)) {
-               MGMTD_FE_CLIENT_ERR(
-                       "Message size %d more than max size'%d. Not sending!'",
-                       (int)msg_size, (int)sizeof(msg_buf));
+       /* users current expect this to fail here */
+       if (client_ctx->conn_fd == -1) {
+               MGMTD_FE_CLIENT_DBG("can't send message on closed connection");
                return -1;
        }
 
-       msg = (struct mgmt_fe_msg *)msg_buf;
-       msg->hdr.marker = MGMTD_FE_MSG_MARKER;
-       msg->hdr.len = (uint16_t)msg_size;
-       mgmtd__fe_message__pack(fe_msg, msg->payload);
-
-       if (!client_ctx->obuf_work)
-               client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) {
-               stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
-               client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       }
-       stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size);
-
+       int rv = mgmt_msg_send_msg(
+               &client_ctx->mstate, fe_msg,
+               mgmtd__fe_message__get_packed_size(fe_msg),
+               (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+               mgmt_debug_fe_client);
        mgmt_fe_client_sched_msg_write(client_ctx);
-       client_ctx->num_msg_tx++;
-       return 0;
+       return rv;
 }
 
 static void mgmt_fe_client_write(struct thread *thread)
 {
-       int bytes_written = 0;
-       int processed = 0;
-       int msg_size = 0;
-       struct stream *s = NULL;
-       struct stream *free = NULL;
        struct mgmt_fe_client_ctx *client_ctx;
+       enum mgmt_msg_wsched rv;
 
        client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
-
-       /* Ensure pushing any pending write buffer to FIFO */
-       if (client_ctx->obuf_work) {
-               stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
-               client_ctx->obuf_work = NULL;
-       }
-
-       for (s = stream_fifo_head(client_ctx->obuf_fifo);
-            s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE;
-            s = stream_fifo_head(client_ctx->obuf_fifo)) {
-               /* msg_size = (int)stream_get_size(s); */
-               msg_size = (int)STREAM_READABLE(s);
-               bytes_written = stream_flush(s, client_ctx->conn_fd);
-               if (bytes_written == -1
-                   && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                       mgmt_fe_client_register_event(
-                               client_ctx, MGMTD_FE_CONN_WRITE);
-                       return;
-               } else if (bytes_written != msg_size) {
-                       MGMTD_FE_CLIENT_ERR(
-                               "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
-                               msg_size, bytes_written, safe_strerror(errno));
-                       if (bytes_written > 0) {
-                               stream_forward_getp(s, (size_t)bytes_written);
-                               stream_pulldown(s);
-                               mgmt_fe_client_register_event(
-                                       client_ctx, MGMTD_FE_CONN_WRITE);
-                               return;
-                       }
-                       mgmt_fe_server_disconnect(client_ctx, true);
-                       return;
-               }
-
-               free = stream_fifo_pop(client_ctx->obuf_fifo);
-               stream_free(free);
-               MGMTD_FE_CLIENT_DBG(
-                       "Wrote %d bytes of message to MGMTD Backend client socket.'",
-                       bytes_written);
-               processed++;
-       }
-
-       if (s) {
+       rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
+                           mgmt_debug_fe_client);
+       if (rv == MSW_SCHED_STREAM)
+               mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE);
+       else if (rv == MSW_DISCONNECT)
+               mgmt_fe_server_disconnect(client_ctx, true);
+       else if (rv == MSW_SCHED_WRITES_OFF) {
                mgmt_fe_client_writes_off(client_ctx);
                mgmt_fe_client_register_event(client_ctx,
-                                                 MGMTD_FE_CONN_WRITES_ON);
-       }
+                                             MGMTD_FE_CONN_WRITES_ON);
+       } else
+               assert(rv == MSW_SCHED_NONE);
 }
 
 static void mgmt_fe_client_resume_writes(struct thread *thread)
@@ -262,7 +199,7 @@ static void mgmt_fe_client_resume_writes(struct thread *thread)
        struct mgmt_fe_client_ctx *client_ctx;
 
        client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
+       assert(client_ctx && client_ctx->conn_fd != -1);
 
        mgmt_fe_client_writes_on(client_ctx);
 }
@@ -713,271 +650,83 @@ mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx,
        return 0;
 }
 
-static int
-mgmt_fe_client_process_msg(struct mgmt_fe_client_ctx *client_ctx,
-                              uint8_t *msg_buf, int bytes_read)
+static void mgmt_fe_client_process_msg(void *user_ctx, uint8_t *data,
+                                      size_t len)
 {
+       struct mgmt_fe_client_ctx *client_ctx = user_ctx;
        Mgmtd__FeMessage *fe_msg;
-       struct mgmt_fe_msg *msg;
-       uint16_t bytes_left;
-       uint16_t processed = 0;
 
-       MGMTD_FE_CLIENT_DBG(
-               "Have %u bytes of messages from MGMTD Frontend server to .",
-               bytes_read);
-
-       bytes_left = bytes_read;
-       for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;
-            bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
-               msg = (struct mgmt_fe_msg *)msg_buf;
-               if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) {
-                       MGMTD_FE_CLIENT_DBG(
-                               "Marker not found in message from MGMTD Frontend server.");
-                       break;
-               }
-
-               if (bytes_left < msg->hdr.len) {
-                       MGMTD_FE_CLIENT_DBG(
-                               "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend server.",
-                               bytes_left, msg->hdr.len);
-                       break;
-               }
-
-               fe_msg = mgmtd__fe_message__unpack(
-                       NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN),
-                       msg->payload);
-               if (!fe_msg) {
-                       MGMTD_FE_CLIENT_DBG(
-                               "Failed to decode %d bytes from MGMTD Frontend server.",
-                               msg->hdr.len);
-                       continue;
-               }
-
-               MGMTD_FE_CLIENT_DBG(
-                       "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend server",
-                       msg->hdr.len, fe_msg->message_case,
-                       fe_msg->message_case);
-
-               (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg);
-
-               mgmtd__fe_message__free_unpacked(fe_msg, NULL);
-               processed++;
-               client_ctx->num_msg_rx++;
+       fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
+       if (!fe_msg) {
+               MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.",
+                                   len);
+               return;
        }
-
-       return processed;
+       MGMTD_FE_CLIENT_DBG(
+               "Decoded %zu bytes of message(msg: %u/%u) from server", len,
+               fe_msg->message_case, fe_msg->message_case);
+       (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg);
+       mgmtd__fe_message__free_unpacked(fe_msg, NULL);
 }
 
 static void mgmt_fe_client_proc_msgbufs(struct thread *thread)
 {
        struct mgmt_fe_client_ctx *client_ctx;
-       struct stream *work;
-       int processed = 0;
 
        client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
-
-       for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) {
-               work = stream_fifo_pop_safe(client_ctx->ibuf_fifo);
-               if (!work)
-                       break;
-
-               processed += mgmt_fe_client_process_msg(
-                       client_ctx, STREAM_DATA(work), stream_get_endp(work));
-
-               if (work != client_ctx->ibuf_work) {
-                       /* Free it up */
-                       stream_free(work);
-               } else {
-                       /* Reset stream buffer for next read */
-                       stream_reset(work);
-               }
-       }
-
-       /*
-        * If we have more to process, reschedule for processing it.
-        */
-       if (stream_fifo_head(client_ctx->ibuf_fifo))
-               mgmt_fe_client_register_event(client_ctx,
-                                                 MGMTD_FE_PROC_MSG);
+       if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg,
+                             client_ctx, mgmt_debug_fe_client))
+               mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
 }
 
 static void mgmt_fe_client_read(struct thread *thread)
 {
        struct mgmt_fe_client_ctx *client_ctx;
-       int bytes_read, msg_cnt;
-       size_t total_bytes, bytes_left;
-       struct mgmt_fe_msg_hdr *msg_hdr;
-       bool incomplete = false;
+       enum mgmt_msg_rsched rv;
 
        client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx && client_ctx->conn_fd);
-
-       total_bytes = 0;
-       bytes_left = STREAM_SIZE(client_ctx->ibuf_work)
-                    - stream_get_endp(client_ctx->ibuf_work);
-       for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
-               bytes_read = stream_read_try(client_ctx->ibuf_work,
-                                            client_ctx->conn_fd, bytes_left);
-               MGMTD_FE_CLIENT_DBG(
-                       "Got %d bytes of message from MGMTD Frontend server",
-                       bytes_read);
-               /* -2 is normal nothing read, and to retry */
-               if (bytes_read == -2)
-                       break;
-               if (bytes_read <= 0) {
-                       if (bytes_read == 0) {
-                               MGMTD_FE_CLIENT_ERR(
-                                       "Got EOF/disconnect while reading from MGMTD Frontend server.");
-                       } else {
-                               /* Fatal error */
-                               MGMTD_FE_CLIENT_ERR(
-                                       "Got error (%d) while reading from MGMTD Frontend server. Err: '%s'",
-                                       bytes_read, safe_strerror(errno));
-                       }
-                       mgmt_fe_server_disconnect(client_ctx, true);
-                       return;
-               }
-               total_bytes += bytes_read;
-               bytes_left -= bytes_read;
-       }
-
-       /*
-        * Check if we have read complete messages or not.
-        */
-       stream_set_getp(client_ctx->ibuf_work, 0);
-       total_bytes = 0;
-       msg_cnt = 0;
-       bytes_left = stream_get_endp(client_ctx->ibuf_work);
-       for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
-               msg_hdr = (struct mgmt_fe_msg_hdr
-                                  *)(STREAM_DATA(client_ctx->ibuf_work)
-                                     + total_bytes);
-               if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) {
-                       /* Corrupted buffer. Force disconnect?? */
-                       MGMTD_FE_CLIENT_ERR(
-                               "Received corrupted buffer from MGMTD frontend server.");
-                       mgmt_fe_server_disconnect(client_ctx, true);
-                       return;
-               }
-               if (msg_hdr->len > bytes_left)
-                       break;
-
-               total_bytes += msg_hdr->len;
-               bytes_left -= msg_hdr->len;
-               msg_cnt++;
-       }
-
-       if (!msg_cnt)
-               goto resched;
-
-       if (bytes_left > 0)
-               incomplete = true;
 
-       /*
-        * We have read one or several messages.
-        * Schedule processing them now.
-        */
-       msg_hdr =
-               (struct mgmt_fe_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work)
-                                              + total_bytes);
-       stream_set_endp(client_ctx->ibuf_work, total_bytes);
-       stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work);
-       client_ctx->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       if (incomplete) {
-               stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left);
-               stream_set_endp(client_ctx->ibuf_work, bytes_left);
+       rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
+                          mgmt_debug_fe_client);
+       if (rv == MSR_DISCONNECT) {
+               mgmt_fe_server_disconnect(client_ctx, true);
+               return;
        }
-
-       mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
-
-resched:
+       if (rv == MSR_SCHED_BOTH)
+               mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
        mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
 }
 
-static int mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
+static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
 {
-       int ret, sock, len;
-       struct sockaddr_un addr;
-
-       MGMTD_FE_CLIENT_DBG(
-               "Trying to connect to MGMTD Frontend server at %s",
-               MGMTD_FE_SERVER_PATH);
-
-       assert(!client_ctx->conn_fd);
+       const char *dbgtag = mgmt_debug_fe_client ? "FE-client" : NULL;
 
-       sock = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (sock < 0) {
-               MGMTD_FE_CLIENT_ERR("Failed to create socket");
-               goto mgmt_fe_server_connect_failed;
-       }
+       assert(client_ctx->conn_fd == -1);
+       client_ctx->conn_fd = mgmt_msg_connect(
+               MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE,
+               MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag);
 
-       MGMTD_FE_CLIENT_DBG(
-               "Created MGMTD Frontend server socket successfully!");
-
-       memset(&addr, 0, sizeof(struct sockaddr_un));
-       addr.sun_family = AF_UNIX;
-       strlcpy(addr.sun_path, MGMTD_FE_SERVER_PATH, sizeof(addr.sun_path));
-#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
-       len = addr.sun_len = SUN_LEN(&addr);
-#else
-       len = sizeof(addr.sun_family) + strlen(addr.sun_path);
-#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
-
-       ret = connect(sock, (struct sockaddr *)&addr, len);
-       if (ret < 0) {
-               MGMTD_FE_CLIENT_ERR(
-                       "Failed to connect to MGMTD Frontend Server at %s. Err: %s",
-                       addr.sun_path, safe_strerror(errno));
-               close(sock);
-               goto mgmt_fe_server_connect_failed;
+       /* Send REGISTER_REQ message */
+       if (client_ctx->conn_fd == -1 ||
+           mgmt_fe_send_register_req(client_ctx) != 0) {
+               mgmt_fe_server_disconnect(client_ctx, true);
+               return;
        }
 
-       MGMTD_FE_CLIENT_DBG(
-               "Connected to MGMTD Frontend Server at %s successfully!",
-               addr.sun_path);
-       client_ctx->conn_fd = sock;
-
-       /* Make client socket non-blocking.  */
-       set_nonblocking(sock);
-       setsockopt_so_sendbuf(client_ctx->conn_fd,
-                             MGMTD_SOCKET_FE_SEND_BUF_SIZE);
-       setsockopt_so_recvbuf(client_ctx->conn_fd,
-                             MGMTD_SOCKET_FE_RECV_BUF_SIZE);
-
-       thread_add_read(client_ctx->tm, mgmt_fe_client_read,
-                       (void *)&mgmt_fe_client_ctx, client_ctx->conn_fd,
-                       &client_ctx->conn_read_ev);
-       assert(client_ctx->conn_read_ev);
-
-       /* Send REGISTER_REQ message */
-       if (mgmt_fe_send_register_req(client_ctx) != 0)
-               goto mgmt_fe_server_connect_failed;
+       /* Start reading from the socket */
+       mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
 
        /* Notify client through registered callback (if any) */
        if (client_ctx->client_params.client_connect_notify)
                (void)(*client_ctx->client_params.client_connect_notify)(
                        (uintptr_t)client_ctx,
                        client_ctx->client_params.user_data, true);
-
-       return 0;
-
-mgmt_fe_server_connect_failed:
-       if (sock && sock != client_ctx->conn_fd)
-               close(sock);
-
-       mgmt_fe_server_disconnect(client_ctx, true);
-       return -1;
 }
 
+
 static void mgmt_fe_client_conn_timeout(struct thread *thread)
 {
-       struct mgmt_fe_client_ctx *client_ctx;
-
-       client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
-       assert(client_ctx);
-
-       mgmt_fe_server_connect(client_ctx);
+       mgmt_fe_server_connect(THREAD_ARG(thread));
 }
 
 static void
@@ -1046,16 +795,9 @@ uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params,
                mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec =
                        MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC;
 
-       assert(!mgmt_fe_client_ctx.ibuf_fifo
-              && !mgmt_fe_client_ctx.ibuf_work
-              && !mgmt_fe_client_ctx.obuf_fifo
-              && !mgmt_fe_client_ctx.obuf_work);
-
-       mgmt_fe_client_ctx.ibuf_fifo = stream_fifo_new();
-       mgmt_fe_client_ctx.ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       mgmt_fe_client_ctx.obuf_fifo = stream_fifo_new();
-
-       mgmt_fe_client_ctx.obuf_work = NULL;
+       mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
+                     MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+                     "FE-client");
 
        mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions);
 
@@ -1322,8 +1064,6 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl)
 
        mgmt_fe_server_disconnect(client_ctx, false);
 
-       assert(mgmt_fe_client_ctx.ibuf_fifo && mgmt_fe_client_ctx.obuf_fifo);
-
        mgmt_fe_destroy_client_sessions(lib_hndl);
 
        THREAD_OFF(client_ctx->conn_retry_tmr);
@@ -1331,10 +1071,5 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl)
        THREAD_OFF(client_ctx->conn_write_ev);
        THREAD_OFF(client_ctx->conn_writes_on);
        THREAD_OFF(client_ctx->msg_proc_ev);
-       stream_fifo_free(mgmt_fe_client_ctx.ibuf_fifo);
-       if (mgmt_fe_client_ctx.ibuf_work)
-               stream_free(mgmt_fe_client_ctx.ibuf_work);
-       stream_fifo_free(mgmt_fe_client_ctx.obuf_fifo);
-       if (mgmt_fe_client_ctx.obuf_work)
-               stream_free(mgmt_fe_client_ctx.obuf_work);
+       mgmt_msg_destroy(&client_ctx->mstate);
 }
index 4ebecca2150b7c8f3c24c312c7adb1a39f7a4a3c..ac29b8f27ceaa8ab3e05d1978f0900539a956986 100644 (file)
@@ -12,8 +12,9 @@
 extern "C" {
 #endif
 
-#include "mgmtd/mgmt_defines.h"
 #include "mgmt_pb.h"
+#include "thread.h"
+#include "mgmtd/mgmt_defines.h"
 
 /***************************************************************
  * Macros
@@ -55,18 +56,6 @@ extern "C" {
 #define MGMTD_DS_OPERATIONAL MGMTD__DATASTORE_ID__OPERATIONAL_DS
 #define MGMTD_DS_MAX_ID MGMTD_DS_OPERATIONAL + 1
 
-struct mgmt_fe_msg_hdr {
-       uint16_t marker;
-       uint16_t len; /* Includes header */
-};
-#define MGMTD_FE_MSG_HDR_LEN sizeof(struct mgmt_fe_msg_hdr)
-#define MGMTD_FE_MSG_MARKER 0xdeaf
-
-struct mgmt_fe_msg {
-       struct mgmt_fe_msg_hdr hdr;
-       uint8_t payload[];
-};
-
 /*
  * All the client specific information this library needs to
  * initialize itself, setup connection with MGMTD FrontEnd interface
index 8ad40642987ac611c5abe0b121f36aa8aa07c414..c57fa081a9936459fb28fd9d4a1bc44c9e7baa1e 100644 (file)
@@ -11,6 +11,7 @@
 #include "sockopt.h"
 #include "network.h"
 #include "libfrr.h"
+#include "mgmt_msg.h"
 #include "mgmt_pb.h"
 #include "mgmtd/mgmt.h"
 #include "mgmtd/mgmt_memory.h"
@@ -497,8 +498,7 @@ mgmt_be_adapter_writes_on(struct mgmt_be_client_adapter *adapter)
 {
        MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name);
        UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF);
-       if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo))
-               mgmt_be_adapter_sched_msg_write(adapter);
+       mgmt_be_adapter_sched_msg_write(adapter);
 }
 
 static inline void
@@ -509,40 +509,20 @@ mgmt_be_adapter_writes_off(struct mgmt_be_client_adapter *adapter)
 }
 
 static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter,
-                                      Mgmtd__BeMessage *be_msg)
+                                   Mgmtd__BeMessage *be_msg)
 {
-       size_t msg_size;
-       uint8_t *msg_buf = adapter->msg_buf;
-       struct mgmt_be_msg *msg;
-
-       if (adapter->conn_fd < 0)
-               return -1;
-
-       msg_size = mgmtd__be_message__get_packed_size(be_msg);
-       msg_size += MGMTD_BE_MSG_HDR_LEN;
-       if (msg_size > MGMTD_BE_MSG_MAX_LEN) {
-               MGMTD_BE_ADAPTER_ERR(
-                       "Message size %d more than max size'%d. Not sending!'",
-                       (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN);
+       if (adapter->conn_fd == -1) {
+               MGMTD_BE_ADAPTER_DBG("can't send message on closed connection");
                return -1;
        }
 
-       msg = (struct mgmt_be_msg *)msg_buf;
-       msg->hdr.marker = MGMTD_BE_MSG_MARKER;
-       msg->hdr.len = (uint16_t)msg_size;
-       mgmtd__be_message__pack(be_msg, msg->payload);
-
-       if (!adapter->obuf_work)
-               adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) {
-               stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
-               adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       }
-       stream_write(adapter->obuf_work, (void *)msg_buf, msg_size);
-
+       int rv = mgmt_msg_send_msg(
+               &adapter->mstate, be_msg,
+               mgmtd__be_message__get_packed_size(be_msg),
+               (size_t(*)(void *, void *))mgmtd__be_message__pack,
+               mgmt_debug_be);
        mgmt_be_adapter_sched_msg_write(adapter);
-       adapter->num_msg_tx++;
-       return 0;
+       return rv;
 }
 
 static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
@@ -614,239 +594,67 @@ static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter,
        return mgmt_be_adapter_send_msg(adapter, &be_msg);
 }
 
-static uint16_t
-mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter,
-                              uint8_t *msg_buf, uint16_t bytes_read)
+static void mgmt_be_adapter_process_msg(void *user_ctx, uint8_t *data,
+                                       size_t len)
 {
+       struct mgmt_be_client_adapter *adapter = user_ctx;
        Mgmtd__BeMessage *be_msg;
-       struct mgmt_be_msg *msg;
-       uint16_t bytes_left;
-       uint16_t processed = 0;
-
-       bytes_left = bytes_read;
-       for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;
-            bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
-               msg = (struct mgmt_be_msg *)msg_buf;
-               if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) {
-                       MGMTD_BE_ADAPTER_DBG(
-                               "Marker not found in message from MGMTD Backend adapter '%s'",
-                               adapter->name);
-                       break;
-               }
-
-               if (bytes_left < msg->hdr.len) {
-                       MGMTD_BE_ADAPTER_DBG(
-                               "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'",
-                               bytes_left, msg->hdr.len, adapter->name);
-                       break;
-               }
 
-               be_msg = mgmtd__be_message__unpack(
-                       NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN),
-                       msg->payload);
-               if (!be_msg) {
-                       MGMTD_BE_ADAPTER_DBG(
-                               "Failed to decode %d bytes from MGMTD Backend adapter '%s'",
-                               msg->hdr.len, adapter->name);
-                       continue;
-               }
-
-               (void)mgmt_be_adapter_handle_msg(adapter, be_msg);
-               mgmtd__be_message__free_unpacked(be_msg, NULL);
-               processed++;
-               adapter->num_msg_rx++;
+       be_msg = mgmtd__be_message__unpack(NULL, len, data);
+       if (!be_msg) {
+               MGMTD_BE_ADAPTER_DBG(
+                       "Failed to decode %zu bytes for adapter: %s", len,
+                       adapter->name);
+               return;
        }
-
-       return processed;
+       MGMTD_BE_ADAPTER_DBG("Decoded %zu bytes of message: %u for adapter: %s",
+                            len, be_msg->message_case, adapter->name);
+       (void)mgmt_be_adapter_handle_msg(adapter, be_msg);
+       mgmtd__be_message__free_unpacked(be_msg, NULL);
 }
 
 static void mgmt_be_adapter_proc_msgbufs(struct thread *thread)
 {
-       struct mgmt_be_client_adapter *adapter;
-       struct stream *work;
-       int processed = 0;
-
-       adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
-       assert(adapter);
-
-       if (adapter->conn_fd < 0)
-               return;
-
-       for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) {
-               work = stream_fifo_pop_safe(adapter->ibuf_fifo);
-               if (!work)
-                       break;
-
-               processed += mgmt_be_adapter_process_msg(
-                       adapter, STREAM_DATA(work), stream_get_endp(work));
-
-               if (work != adapter->ibuf_work) {
-                       /* Free it up */
-                       stream_free(work);
-               } else {
-                       /* Reset stream buffer for next read */
-                       stream_reset(work);
-               }
-       }
+       struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread);
 
-       /*
-        * If we have more to process, reschedule for processing it.
-        */
-       if (stream_fifo_head(adapter->ibuf_fifo))
+       if (mgmt_msg_procbufs(&adapter->mstate, mgmt_be_adapter_process_msg,
+                             adapter, mgmt_debug_be))
                mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG);
 }
 
 static void mgmt_be_adapter_read(struct thread *thread)
 {
        struct mgmt_be_client_adapter *adapter;
-       int bytes_read, msg_cnt;
-       size_t total_bytes, bytes_left;
-       struct mgmt_be_msg_hdr *msg_hdr;
-       bool incomplete = false;
+       enum mgmt_msg_rsched rv;
 
        adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
-       assert(adapter && adapter->conn_fd >= 0);
 
-       total_bytes = 0;
-       bytes_left = STREAM_SIZE(adapter->ibuf_work) -
-                    stream_get_endp(adapter->ibuf_work);
-       for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
-               bytes_read = stream_read_try(adapter->ibuf_work,
-                                            adapter->conn_fd, bytes_left);
-               MGMTD_BE_ADAPTER_DBG(
-                       "Got %d bytes of message from MGMTD Backend adapter '%s'",
-                       bytes_read, adapter->name);
-               if (bytes_read <= 0) {
-                       if (bytes_read == -1
-                           && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                               mgmt_be_adapter_register_event(
-                                       adapter, MGMTD_BE_CONN_READ);
-                               return;
-                       }
-
-                       if (!bytes_read) {
-                               /* Looks like connection closed */
-                               MGMTD_BE_ADAPTER_ERR(
-                                       "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'",
-                                       bytes_read, adapter->name,
-                                       safe_strerror(errno));
-                               mgmt_be_adapter_disconnect(adapter);
-                               return;
-                       }
-                       break;
-               }
-
-               total_bytes += bytes_read;
-               bytes_left -= bytes_read;
-       }
-
-       /*
-        * Check if we would have read incomplete messages or not.
-        */
-       stream_set_getp(adapter->ibuf_work, 0);
-       total_bytes = 0;
-       msg_cnt = 0;
-       bytes_left = stream_get_endp(adapter->ibuf_work);
-       for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
-               msg_hdr =
-                       (struct mgmt_be_msg_hdr *)(STREAM_DATA(
-                                                             adapter->ibuf_work)
-                                                     + total_bytes);
-               if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) {
-                       /* Corrupted buffer. Force disconnect?? */
-                       MGMTD_BE_ADAPTER_ERR(
-                               "Received corrupted buffer from MGMTD Backend client.");
-                       mgmt_be_adapter_disconnect(adapter);
-                       return;
-               }
-               if (msg_hdr->len > bytes_left)
-                       break;
-
-               total_bytes += msg_hdr->len;
-               bytes_left -= msg_hdr->len;
-               msg_cnt++;
-       }
-
-       if (bytes_left > 0)
-               incomplete = true;
-
-       /*
-        * We would have read one or several messages.
-        * Schedule processing them now.
-        */
-       msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work)
-                                               + total_bytes);
-       stream_set_endp(adapter->ibuf_work, total_bytes);
-       stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work);
-       adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-       if (incomplete) {
-               stream_put(adapter->ibuf_work, msg_hdr, bytes_left);
-               stream_set_endp(adapter->ibuf_work, bytes_left);
+       rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_be);
+       if (rv == MSR_DISCONNECT) {
+               mgmt_be_adapter_disconnect(adapter);
+               return;
        }
-
-       if (msg_cnt)
+       if (rv == MSR_SCHED_BOTH)
                mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG);
-
        mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ);
 }
 
 static void mgmt_be_adapter_write(struct thread *thread)
 {
-       int bytes_written = 0;
-       int processed = 0;
-       int msg_size = 0;
-       struct stream *s = NULL;
-       struct stream *free = NULL;
-       struct mgmt_be_client_adapter *adapter;
+       struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread);
+       enum mgmt_msg_wsched rv;
 
-       adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
-       assert(adapter && adapter->conn_fd >= 0);
-
-       /* Ensure pushing any pending write buffer to FIFO */
-       if (adapter->obuf_work) {
-               stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
-               adapter->obuf_work = NULL;
-       }
-
-       for (s = stream_fifo_head(adapter->obuf_fifo);
-            s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE;
-            s = stream_fifo_head(adapter->obuf_fifo)) {
-               /* msg_size = (int)stream_get_size(s); */
-               msg_size = (int)STREAM_READABLE(s);
-               bytes_written = stream_flush(s, adapter->conn_fd);
-               if (bytes_written == -1
-                   && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                       mgmt_be_adapter_register_event(adapter,
-                                                       MGMTD_BE_CONN_WRITE);
-                       return;
-               } else if (bytes_written != msg_size) {
-                       MGMTD_BE_ADAPTER_ERR(
-                               "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
-                               msg_size, bytes_written, safe_strerror(errno));
-                       if (bytes_written > 0) {
-                               stream_forward_getp(s, (size_t)bytes_written);
-                               stream_pulldown(s);
-                               mgmt_be_adapter_register_event(
-                                       adapter, MGMTD_BE_CONN_WRITE);
-                               return;
-                       }
-                       mgmt_be_adapter_disconnect(adapter);
-                       return;
-               }
-
-               free = stream_fifo_pop(adapter->obuf_fifo);
-               stream_free(free);
-               MGMTD_BE_ADAPTER_DBG(
-                       "Wrote %d bytes of message to MGMTD Backend client socket.'",
-                       bytes_written);
-               processed++;
-       }
-
-       if (s) {
+       rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_be);
+       if (rv == MSW_SCHED_STREAM)
+               mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE);
+       else if (rv == MSW_DISCONNECT)
+               mgmt_be_adapter_disconnect(adapter);
+       else if (rv == MSW_SCHED_WRITES_OFF) {
                mgmt_be_adapter_writes_off(adapter);
                mgmt_be_adapter_register_event(adapter,
-                                               MGMTD_BE_CONN_WRITES_ON);
-       }
+                                              MGMTD_BE_CONN_WRITES_ON);
+       } else
+               assert(rv == MSW_SCHED_NONE);
 }
 
 static void mgmt_be_adapter_resume_writes(struct thread *thread)
@@ -936,6 +744,14 @@ mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter,
                assert(adapter->conn_read_ev);
                break;
        case MGMTD_BE_CONN_WRITE:
+               if (adapter->conn_write_ev)
+                       MGMTD_BE_ADAPTER_DBG(
+                               "write ready notify already set for client %s",
+                               adapter->name);
+               else
+                       MGMTD_BE_ADAPTER_DBG(
+                               "scheduling write ready notify for client %s",
+                               adapter->name);
                thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write,
                                 adapter, adapter->conn_fd, &adapter->conn_write_ev);
                assert(adapter->conn_write_ev);
@@ -976,17 +792,12 @@ extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter)
        (*adapter)->refcount--;
        if (!(*adapter)->refcount) {
                mgmt_be_adapters_del(&mgmt_be_adapters, *adapter);
-
-               stream_fifo_free((*adapter)->ibuf_fifo);
-               stream_free((*adapter)->ibuf_work);
-               stream_fifo_free((*adapter)->obuf_fifo);
-               stream_free((*adapter)->obuf_work);
-
                THREAD_OFF((*adapter)->conn_init_ev);
                THREAD_OFF((*adapter)->conn_read_ev);
                THREAD_OFF((*adapter)->conn_write_ev);
                THREAD_OFF((*adapter)->conn_writes_on);
                THREAD_OFF((*adapter)->proc_msg_ev);
+               mgmt_msg_destroy(&(*adapter)->mstate);
                XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter);
        }
 
@@ -1029,11 +840,9 @@ mgmt_be_create_adapter(int conn_fd, union sockunion *from)
                memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su));
                snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d",
                         adapter->conn_fd);
-               adapter->ibuf_fifo = stream_fifo_new();
-               adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
-               adapter->obuf_fifo = stream_fifo_new();
-               /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */
-               adapter->obuf_work = NULL;
+               mgmt_msg_init(&adapter->mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
+                             MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+                             "BE-adapter");
                mgmt_be_adapter_lock(adapter);
 
                mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ);
@@ -1195,8 +1004,14 @@ void mgmt_be_adapter_status_write(struct vty *vty)
                vty_out(vty, "    Conn-FD: \t\t\t%d\n", adapter->conn_fd);
                vty_out(vty, "    Client-Id: \t\t\t%d\n", adapter->id);
                vty_out(vty, "    Ref-Count: \t\t\t%u\n", adapter->refcount);
-               vty_out(vty, "    Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx);
-               vty_out(vty, "    Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx);
+               vty_out(vty, "    Msg-Recvd: \t\t\t%" PRIu64 "\n",
+                       adapter->mstate.nrxm);
+               vty_out(vty, "    Bytes-Recvd: \t\t%" PRIu64 "\n",
+                       adapter->mstate.nrxb);
+               vty_out(vty, "    Msg-Sent: \t\t\t%" PRIu64 "\n",
+                       adapter->mstate.ntxm);
+               vty_out(vty, "    Bytes-Sent: \t\t%" PRIu64 "\n",
+                       adapter->mstate.ntxb);
        }
        vty_out(vty, "  Total: %d\n",
                (int)mgmt_be_adapters_count(&mgmt_be_adapters));
index 5dfc2386dab243960619accba935555ec4d0e27d..7f57233d3536348b17d85f500feeb7e689a63be4 100644 (file)
@@ -9,8 +9,9 @@
 #ifndef _FRR_MGMTD_BE_ADAPTER_H_
 #define _FRR_MGMTD_BE_ADAPTER_H_
 
-#include "mgmtd/mgmt_defines.h"
 #include "mgmt_be_client.h"
+#include "mgmt_msg.h"
+#include "mgmtd/mgmt_defines.h"
 #include "mgmtd/mgmt_ds.h"
 
 #define MGMTD_BE_CONN_INIT_DELAY_MSEC 50
@@ -54,22 +55,9 @@ struct mgmt_be_client_adapter {
        char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN];
 
        /* IO streams for read and write */
-       /* pthread_mutex_t ibuf_mtx; */
-       struct stream_fifo *ibuf_fifo;
-       /* pthread_mutex_t obuf_mtx; */
-       struct stream_fifo *obuf_fifo;
-
-       /* Private I/O buffers */
-       struct stream *ibuf_work;
-       struct stream *obuf_work;
-       uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN];
-
-       /* Buffer of data waiting to be written to client. */
-       /* struct buffer *wb; */
+       struct mgmt_msg_state mstate;
 
        int refcount;
-       uint32_t num_msg_tx;
-       uint32_t num_msg_rx;
 
        /*
         * List of config items that should be sent to the
index 6464b12ae3ed664853344301172f964e7af1500f..6997fdcf81f8ce8faac5d0ef8edc0bcb30a0f905 100644 (file)
@@ -28,7 +28,7 @@
        zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__)
 #endif /* REDIRECT_DEBUG_TO_STDERR */
 
-static int mgmt_be_listen_fd;
+static int mgmt_be_listen_fd = -1;
 static struct thread_master *mgmt_be_listen_tm;
 static struct thread *mgmt_be_listen_ev;
 static void mgmt_be_server_register_event(enum mgmt_be_event event);
index 1ea812c1a74a91700d7a0c4b3363236acab476dd..cc812ab150ed931e1d53768210eb5e4e37aafcd8 100644 (file)
@@ -11,6 +11,7 @@
 #include "network.h"
 #include "libfrr.h"
 #include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
 #include "mgmt_pb.h"
 #include "hash.h"
 #include "jhash.h"
@@ -375,8 +376,7 @@ mgmt_fe_adapter_writes_on(struct mgmt_fe_client_adapter *adapter)
 {
        MGMTD_FE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name);
        UNSET_FLAG(adapter->flags, MGMTD_FE_ADAPTER_FLAGS_WRITES_OFF);
-       if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo))
-               mgmt_fe_adapter_sched_msg_write(adapter);
+       mgmt_fe_adapter_sched_msg_write(adapter);
 }
 
 static inline void
@@ -390,40 +390,18 @@ static int
 mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter,
                             Mgmtd__FeMessage *fe_msg)
 {
-       size_t msg_size;
-       uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN];
-       struct mgmt_fe_msg *msg;
-
-       if (adapter->conn_fd < 0) {
-               MGMTD_FE_ADAPTER_ERR("Connection already reset");
-               return -1;
-       }
-
-       msg_size = mgmtd__fe_message__get_packed_size(fe_msg);
-       msg_size += MGMTD_FE_MSG_HDR_LEN;
-       if (msg_size > sizeof(msg_buf)) {
-               MGMTD_FE_ADAPTER_ERR(
-                       "Message size %d more than max size'%d. Not sending!'",
-                       (int)msg_size, (int)sizeof(msg_buf));
+       if (adapter->conn_fd == -1) {
+               MGMTD_FE_ADAPTER_DBG("can't send message on closed connection");
                return -1;
        }
 
-       msg = (struct mgmt_fe_msg *)msg_buf;
-       msg->hdr.marker = MGMTD_FE_MSG_MARKER;
-       msg->hdr.len = (uint16_t)msg_size;
-       mgmtd__fe_message__pack(fe_msg, msg->payload);
-
-       if (!adapter->obuf_work)
-               adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) {
-               stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
-               adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       }
-       stream_write(adapter->obuf_work, (void *)msg_buf, msg_size);
-
+       int rv = mgmt_msg_send_msg(
+               &adapter->mstate, fe_msg,
+               mgmtd__fe_message__get_packed_size(fe_msg),
+               (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+               mgmt_debug_fe);
        mgmt_fe_adapter_sched_msg_write(adapter);
-       adapter->num_msg_tx++;
-       return 0;
+       return rv;
 }
 
 static int
@@ -1244,6 +1222,9 @@ static int mgmt_fe_session_handle_commit_config_req_msg(
                                "Failed to create a Configuration session!");
                        return 0;
                }
+               MGMTD_FE_ADAPTER_DBG(
+                       "Created txn %llu for session %llu for COMMIT-CFG-REQ",
+                       session->cfg_txn_id, session->session_id);
        }
 
 
@@ -1430,252 +1411,66 @@ mgmt_fe_adapter_handle_msg(struct mgmt_fe_client_adapter *adapter,
        return 0;
 }
 
-static uint16_t
-mgmt_fe_adapter_process_msg(struct mgmt_fe_client_adapter *adapter,
-                               uint8_t *msg_buf, uint16_t bytes_read)
+static void mgmt_fe_adapter_process_msg(void *user_ctx, uint8_t *data,
+                                       size_t len)
 {
+       struct mgmt_fe_client_adapter *adapter = user_ctx;
        Mgmtd__FeMessage *fe_msg;
-       struct mgmt_fe_msg *msg;
-       uint16_t bytes_left;
-       uint16_t processed = 0;
-
-       MGMTD_FE_ADAPTER_DBG(
-               "Have %u bytes of messages from client '%s' to process",
-               bytes_read, adapter->name);
-
-       bytes_left = bytes_read;
-       for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;
-            bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
-               msg = (struct mgmt_fe_msg *)msg_buf;
-               if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) {
-                       MGMTD_FE_ADAPTER_DBG(
-                               "Marker not found in message from MGMTD Frontend adapter '%s'",
-                               adapter->name);
-                       break;
-               }
-
-               if (bytes_left < msg->hdr.len) {
-                       MGMTD_FE_ADAPTER_DBG(
-                               "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend adapter '%s'",
-                               bytes_left, msg->hdr.len, adapter->name);
-                       break;
-               }
-
-               fe_msg = mgmtd__fe_message__unpack(
-                       NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN),
-                       msg->payload);
-               if (!fe_msg) {
-                       MGMTD_FE_ADAPTER_DBG(
-                               "Failed to decode %d bytes from MGMTD Frontend adapter '%s'",
-                               msg->hdr.len, adapter->name);
-                       continue;
-               }
 
+       fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
+       if (!fe_msg) {
                MGMTD_FE_ADAPTER_DBG(
-                       "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend adapter '%s'",
-                       msg->hdr.len, fe_msg->message_case,
-                       fe_msg->message_case, adapter->name);
-
-               (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg);
-
-               mgmtd__fe_message__free_unpacked(fe_msg, NULL);
-               processed++;
-               adapter->num_msg_rx++;
+                       "Failed to decode %zu bytes for adapter: %s", len,
+                       adapter->name);
+               return;
        }
-
-       return processed;
+       MGMTD_FE_ADAPTER_DBG(
+               "Decoded %zu bytes of message: %u from adapter: %s", len,
+               fe_msg->message_case, adapter->name);
+       (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg);
+       mgmtd__fe_message__free_unpacked(fe_msg, NULL);
 }
 
 static void mgmt_fe_adapter_proc_msgbufs(struct thread *thread)
 {
-       struct mgmt_fe_client_adapter *adapter;
-       struct stream *work;
-       int processed = 0;
+       struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
 
-       adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
-       assert(adapter && adapter->conn_fd >= 0);
-
-       MGMTD_FE_ADAPTER_DBG("Have %d ibufs for client '%s' to process",
-                            (int)stream_fifo_count_safe(adapter->ibuf_fifo),
-                            adapter->name);
-
-       for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) {
-               work = stream_fifo_pop_safe(adapter->ibuf_fifo);
-               if (!work)
-                       break;
-
-               processed += mgmt_fe_adapter_process_msg(
-                       adapter, STREAM_DATA(work), stream_get_endp(work));
-
-               if (work != adapter->ibuf_work) {
-                       /* Free it up */
-                       stream_free(work);
-               } else {
-                       /* Reset stream buffer for next read */
-                       stream_reset(work);
-               }
-       }
-
-       /*
-        * If we have more to process, reschedule for processing it.
-        */
-       if (stream_fifo_head(adapter->ibuf_fifo))
+       if (mgmt_msg_procbufs(&adapter->mstate, mgmt_fe_adapter_process_msg,
+                             adapter, mgmt_debug_fe))
                mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG);
 }
 
 static void mgmt_fe_adapter_read(struct thread *thread)
 {
-       struct mgmt_fe_client_adapter *adapter;
-       int bytes_read, msg_cnt;
-       size_t total_bytes, bytes_left;
-       struct mgmt_fe_msg_hdr *msg_hdr;
-       bool incomplete = false;
+       struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
+       enum mgmt_msg_rsched rv;
 
-       adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
-       assert(adapter && adapter->conn_fd);
-
-       total_bytes = 0;
-       bytes_left = STREAM_SIZE(adapter->ibuf_work)
-                    - stream_get_endp(adapter->ibuf_work);
-       for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
-               bytes_read = stream_read_try(adapter->ibuf_work, adapter->conn_fd,
-                                            bytes_left);
-               MGMTD_FE_ADAPTER_DBG(
-                       "Got %d bytes of message from MGMTD Frontend adapter '%s'",
-                       bytes_read, adapter->name);
-               if (bytes_read <= 0) {
-                       if (bytes_read == -1
-                           && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                               mgmt_fe_adapter_register_event(
-                                       adapter, MGMTD_FE_CONN_READ);
-                               return;
-                       }
-
-                       if (!bytes_read) {
-                               /* Looks like connection closed */
-                               MGMTD_FE_ADAPTER_ERR(
-                                       "Got error (%d) while reading from MGMTD Frontend adapter '%s'. Err: '%s'",
-                                       bytes_read, adapter->name,
-                                       safe_strerror(errno));
-                               mgmt_fe_adapter_disconnect(adapter);
-                               return;
-                       }
-                       break;
-               }
-
-               total_bytes += bytes_read;
-               bytes_left -= bytes_read;
-       }
-
-       /*
-        * Check if we would have read incomplete messages or not.
-        */
-       stream_set_getp(adapter->ibuf_work, 0);
-       total_bytes = 0;
-       msg_cnt = 0;
-       bytes_left = stream_get_endp(adapter->ibuf_work);
-       for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
-               msg_hdr =
-                       (struct mgmt_fe_msg_hdr *)(STREAM_DATA(
-                                                              adapter->ibuf_work)
-                                                      + total_bytes);
-               if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) {
-                       /* Corrupted buffer. Force disconnect?? */
-                       MGMTD_FE_ADAPTER_ERR(
-                               "Received corrupted buffer from MGMTD frontend client.");
-                       mgmt_fe_adapter_disconnect(adapter);
-                       return;
-               }
-               if (msg_hdr->len > bytes_left)
-                       break;
-
-               MGMTD_FE_ADAPTER_DBG("Got message (len: %u) from client '%s'",
-                                      msg_hdr->len, adapter->name);
-
-               total_bytes += msg_hdr->len;
-               bytes_left -= msg_hdr->len;
-               msg_cnt++;
-       }
-
-       if (bytes_left > 0)
-               incomplete = true;
-       /*
-        * We would have read one or several messages.
-        * Schedule processing them now.
-        */
-       msg_hdr = (struct mgmt_fe_msg_hdr *)(STREAM_DATA(adapter->ibuf_work)
-                                                + total_bytes);
-       stream_set_endp(adapter->ibuf_work, total_bytes);
-       stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work);
-       adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-       if (incomplete) {
-               stream_put(adapter->ibuf_work, msg_hdr, bytes_left);
-               stream_set_endp(adapter->ibuf_work, bytes_left);
+       rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe);
+       if (rv == MSR_DISCONNECT) {
+               mgmt_fe_adapter_disconnect(adapter);
+               return;
        }
-
-       if (msg_cnt)
+       if (rv == MSR_SCHED_BOTH)
                mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG);
-
        mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ);
 }
 
 static void mgmt_fe_adapter_write(struct thread *thread)
 {
-       int bytes_written = 0;
-       int processed = 0;
-       int msg_size = 0;
-       struct stream *s = NULL;
-       struct stream *free = NULL;
-       struct mgmt_fe_client_adapter *adapter;
-
-       adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
-       assert(adapter && adapter->conn_fd);
-
-       /* Ensure pushing any pending write buffer to FIFO */
-       if (adapter->obuf_work) {
-               stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
-               adapter->obuf_work = NULL;
-       }
-
-       for (s = stream_fifo_head(adapter->obuf_fifo);
-            s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE;
-            s = stream_fifo_head(adapter->obuf_fifo)) {
-               /* msg_size = (int)stream_get_size(s); */
-               msg_size = (int)STREAM_READABLE(s);
-               bytes_written = stream_flush(s, adapter->conn_fd);
-               if (bytes_written == -1
-                   && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                       mgmt_fe_adapter_register_event(
-                               adapter, MGMTD_FE_CONN_WRITE);
-                       return;
-               } else if (bytes_written != msg_size) {
-                       MGMTD_FE_ADAPTER_ERR(
-                               "Could not write all %d bytes (wrote: %d) to MGMTD Frontend client socket. Err: '%s'",
-                               msg_size, bytes_written, safe_strerror(errno));
-                       if (bytes_written > 0) {
-                               stream_forward_getp(s, (size_t)bytes_written);
-                               stream_pulldown(s);
-                               mgmt_fe_adapter_register_event(
-                                       adapter, MGMTD_FE_CONN_WRITE);
-                               return;
-                       }
-                       mgmt_fe_adapter_disconnect(adapter);
-                       return;
-               }
-
-               free = stream_fifo_pop(adapter->obuf_fifo);
-               stream_free(free);
-               MGMTD_FE_ADAPTER_DBG(
-                       "Wrote %d bytes of message to MGMTD Frontend client socket.'",
-                       bytes_written);
-               processed++;
-       }
+       struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
+       enum mgmt_msg_wsched rv;
 
-       if (s) {
+       rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe);
+       if (rv == MSW_SCHED_STREAM)
+               mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_WRITE);
+       else if (rv == MSW_DISCONNECT)
+               mgmt_fe_adapter_disconnect(adapter);
+       else if (rv == MSW_SCHED_WRITES_OFF) {
                mgmt_fe_adapter_writes_off(adapter);
                mgmt_fe_adapter_register_event(adapter,
-                                                MGMTD_FE_CONN_WRITES_ON);
-       }
+                                              MGMTD_FE_CONN_WRITES_ON);
+       } else
+               assert(rv == MSW_SCHED_NONE);
 }
 
 static void mgmt_fe_adapter_resume_writes(struct thread *thread)
@@ -1683,7 +1478,7 @@ static void mgmt_fe_adapter_resume_writes(struct thread *thread)
        struct mgmt_fe_client_adapter *adapter;
 
        adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
-       assert(adapter && adapter->conn_fd);
+       assert(adapter && adapter->conn_fd != -1);
 
        mgmt_fe_adapter_writes_on(adapter);
 }
@@ -1739,16 +1534,11 @@ mgmt_fe_adapter_unlock(struct mgmt_fe_client_adapter **adapter)
        (*adapter)->refcount--;
        if (!(*adapter)->refcount) {
                mgmt_fe_adapters_del(&mgmt_fe_adapters, *adapter);
-
-               stream_fifo_free((*adapter)->ibuf_fifo);
-               stream_free((*adapter)->ibuf_work);
-               stream_fifo_free((*adapter)->obuf_fifo);
-               stream_free((*adapter)->obuf_work);
-
                THREAD_OFF((*adapter)->conn_read_ev);
                THREAD_OFF((*adapter)->conn_write_ev);
                THREAD_OFF((*adapter)->proc_msg_ev);
                THREAD_OFF((*adapter)->conn_writes_on);
+               mgmt_msg_destroy(&(*adapter)->mstate);
                XFREE(MTYPE_MGMTD_FE_ADPATER, *adapter);
        }
 
@@ -1793,11 +1583,10 @@ mgmt_fe_create_adapter(int conn_fd, union sockunion *from)
                snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d",
                         adapter->conn_fd);
                mgmt_fe_sessions_init(&adapter->fe_sessions);
-               adapter->ibuf_fifo = stream_fifo_new();
-               adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
-               adapter->obuf_fifo = stream_fifo_new();
-               /* adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); */
-               adapter->obuf_work = NULL;
+
+               mgmt_msg_init(&adapter->mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
+                             MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+                             "FE-adapter");
                mgmt_fe_adapter_lock(adapter);
 
                mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ);
@@ -2083,9 +1872,14 @@ void mgmt_fe_adapter_status_write(struct vty *vty, bool detail)
                }
                vty_out(vty, "    Total-Sessions: \t\t\t%d\n",
                        (int)mgmt_fe_sessions_count(&adapter->fe_sessions));
-               vty_out(vty, "    Msg-Sent: \t\t\t\t%u\n", adapter->num_msg_tx);
-               vty_out(vty, "    Msg-Recvd: \t\t\t\t%u\n",
-                       adapter->num_msg_rx);
+               vty_out(vty, "    Msg-Recvd: \t\t\t\t%" PRIu64 "\n",
+                       adapter->mstate.nrxm);
+               vty_out(vty, "    Bytes-Recvd: \t\t\t%" PRIu64 "\n",
+                       adapter->mstate.nrxb);
+               vty_out(vty, "    Msg-Sent: \t\t\t\t%" PRIu64 "\n",
+                       adapter->mstate.ntxm);
+               vty_out(vty, "    Bytes-Sent: \t\t\t%" PRIu64 "\n",
+                       adapter->mstate.ntxb);
        }
        vty_out(vty, "  Total: %d\n",
                (int)mgmt_fe_adapters_count(&mgmt_fe_adapters));
index 05d37d3f36f986f7733749859cdc401a036a0d79..3389234a3f4b0136a1b15818bdc9812fd9dcf25d 100644 (file)
@@ -9,6 +9,10 @@
 #ifndef _FRR_MGMTD_FE_ADAPTER_H_
 #define _FRR_MGMTD_FE_ADAPTER_H_
 
+#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
+#include "mgmtd/mgmt_defines.h"
+
 struct mgmt_fe_client_adapter;
 struct mgmt_master;
 
@@ -64,18 +68,9 @@ struct mgmt_fe_client_adapter {
        struct mgmt_fe_sessions_head fe_sessions;
 
        /* IO streams for read and write */
-       /* pthread_mutex_t ibuf_mtx; */
-       struct stream_fifo *ibuf_fifo;
-       /* pthread_mutex_t obuf_mtx; */
-       struct stream_fifo *obuf_fifo;
-
-       /* Private I/O buffers */
-       struct stream *ibuf_work;
-       struct stream *obuf_work;
+       struct mgmt_msg_state mstate;
 
        int refcount;
-       uint32_t num_msg_tx;
-       uint32_t num_msg_rx;
        struct mgmt_commit_stats cmt_stats;
        struct mgmt_setcfg_stats setcfg_stats;
 
index 2db4397cce82b9023fb7eb8c8ec631b720f4def3..0b0a56ea65c6efa55d7167ac1cfa1719d5797dd6 100644 (file)
@@ -28,7 +28,7 @@
        zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__)
 #endif /* REDIRECT_DEBUG_TO_STDERR */
 
-static int mgmt_fe_listen_fd;
+static int mgmt_fe_listen_fd = -1;
 static struct thread_master *mgmt_fe_listen_tm;
 static struct thread *mgmt_fe_listen_ev;
 static void mgmt_fe_server_register_event(enum mgmt_fe_event event);
index 115aa532c47fe094c4900bd88a113acf1483b531..05b593798e43bfc9d34d478a2fd2090a51fa93bc 100644 (file)
@@ -2471,6 +2471,8 @@ int mgmt_txn_notify_be_adapter_conn(struct mgmt_be_client_adapter *adapter,
                        return -1;
                }
 
+               MGMTD_TXN_DBG("Created initial txn %llu for BE connection %s",
+                             txn->txn_id, adapter->name);
                /*
                 * Set the changeset for transaction to commit and trigger the
                 * commit request.
@@ -2608,53 +2610,7 @@ int mgmt_txn_notify_be_cfgdata_reply(
        return 0;
 }
 
-int mgmt_txn_notify_be_cfg_validate_reply(
-       uint64_t txn_id, bool success, uint64_t batch_ids[],
-       size_t num_batch_ids, char *error_if_any,
-       struct mgmt_be_client_adapter *adapter)
-{
-       struct mgmt_txn_ctx *txn;
-       struct mgmt_txn_be_cfg_batch *cfg_btch;
-       struct mgmt_commit_cfg_req *cmtcfg_req = NULL;
-       size_t indx;
-
-       txn = mgmt_txn_id2ctx(txn_id);
-       if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG)
-               return -1;
-
-       assert(txn->commit_cfg_req);
-       cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg;
-
-       if (!success) {
-               MGMTD_TXN_ERR(
-                       "CFGDATA_VALIDATE_REQ sent to '%s' failed for Txn %p, Batches [0x%llx - 0x%llx], Err: %s",
-                       adapter->name, txn, (unsigned long long)batch_ids[0],
-                       (unsigned long long)batch_ids[num_batch_ids - 1],
-                       error_if_any ? error_if_any : "None");
-               mgmt_txn_send_commit_cfg_reply(
-                       txn, MGMTD_INTERNAL_ERROR,
-                       "Internal error! Failed to validate config data on backend!");
-               return 0;
-       }
-
-       for (indx = 0; indx < num_batch_ids; indx++) {
-               cfg_btch = mgmt_txn_cfgbatch_id2ctx(txn, batch_ids[indx]);
-               if (cfg_btch->txn != txn)
-                       return -1;
-               mgmt_move_txn_cfg_batch_to_next(
-                       cmtcfg_req, cfg_btch,
-                       &cmtcfg_req->curr_batches[adapter->id],
-                       &cmtcfg_req->next_batches[adapter->id], true,
-                       MGMTD_COMMIT_PHASE_APPLY_CFG);
-       }
-
-       mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req);
-
-       return 0;
-}
-
-extern int
-mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success,
+int mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success,
                                       uint64_t batch_ids[],
                                       size_t num_batch_ids, char *error_if_any,
                                       struct mgmt_be_client_adapter *adapter)
@@ -2852,6 +2808,8 @@ int mgmt_txn_rollback_trigger_cfg_apply(struct mgmt_ds_ctx *src_ds_ctx,
                return -1;
        }
 
+       MGMTD_TXN_DBG("Created rollback txn %llu", txn->txn_id);
+
        /*
         * Set the changeset for transaction to commit and trigger the commit
         * request.