summaryrefslogtreecommitdiff
path: root/lib/mgmt_be_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mgmt_be_client.c')
-rw-r--r--lib/mgmt_be_client.c436
1 files changed, 85 insertions, 351 deletions
diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c
index 1e98c6123d..4dc7b296b8 100644
--- a/lib/mgmt_be_client.c
+++ b/lib/mgmt_be_client.c
@@ -9,6 +9,7 @@
#include "libfrr.h"
#include "mgmtd/mgmt.h"
#include "mgmt_be_client.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "network.h"
#include "stream.h"
@@ -111,14 +112,8 @@ struct mgmt_be_client_ctx {
struct thread *conn_writes_on;
struct thread *msg_proc_ev;
uint32_t flags;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
- struct stream_fifo *ibuf_fifo;
- struct stream *ibuf_work;
- struct stream_fifo *obuf_fifo;
- struct stream *obuf_work;
- uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN];
+ struct mgmt_msg_state mstate;
struct nb_config *candidate_config;
struct nb_config *running_config;
@@ -143,7 +138,9 @@ struct mgmt_be_client_ctx {
static bool mgmt_debug_be_client;
-static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0};
+static struct mgmt_be_client_ctx mgmt_be_client_ctx = {
+ .conn_fd = -1,
+};
const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = {
#ifdef HAVE_STATICD
@@ -168,14 +165,13 @@ mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx,
{
/* Notify client through registered callback (if any) */
if (client_ctx->client_params.client_connect_notify)
- (void)(*client_ctx->client_params
- .client_connect_notify)(
+ (void)(*client_ctx->client_params.client_connect_notify)(
(uintptr_t)client_ctx,
client_ctx->client_params.user_data, false);
- if (client_ctx->conn_fd) {
+ if (client_ctx->conn_fd != -1) {
close(client_ctx->conn_fd);
- client_ctx->conn_fd = 0;
+ client_ctx->conn_fd = -1;
}
if (reconnect)
@@ -881,181 +877,47 @@ mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx,
return 0;
}
-static int
-mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx,
- uint8_t *msg_buf, int bytes_read)
+static void mgmt_be_client_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_be_client_ctx *client_ctx = user_ctx;
Mgmtd__BeMessage *be_msg;
- struct mgmt_be_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
-
- MGMTD_BE_CLIENT_DBG(
- "Got message of %d bytes from MGMTD Backend Server",
- bytes_read);
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_be_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) {
- MGMTD_BE_CLIENT_DBG(
- "Marker not found in message from MGMTD '%s'",
- client_ctx->client_params.name);
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_BE_CLIENT_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'",
- bytes_left, msg->hdr.len,
- client_ctx->client_params.name);
- break;
- }
-
- be_msg = mgmtd__be_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN),
- msg->payload);
- if (!be_msg) {
- MGMTD_BE_CLIENT_DBG(
- "Failed to decode %d bytes from MGMTD '%s'",
- msg->hdr.len, client_ctx->client_params.name);
- continue;
- }
- (void)mgmt_be_client_handle_msg(client_ctx, be_msg);
- mgmtd__be_message__free_unpacked(be_msg, NULL);
- processed++;
- client_ctx->num_msg_rx++;
+ be_msg = mgmtd__be_message__unpack(NULL, len, data);
+ if (!be_msg) {
+ MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server",
+ len);
+ return;
}
-
- return processed;
+ MGMTD_BE_CLIENT_DBG(
+ "Decoded %zu bytes of message(msg: %u/%u) from server", len,
+ be_msg->message_case, be_msg->message_case);
+ (void)mgmt_be_client_handle_msg(client_ctx, be_msg);
+ mgmtd__be_message__free_unpacked(be_msg, NULL);
}
static void mgmt_be_client_proc_msgbufs(struct thread *thread)
{
- struct mgmt_be_client_ctx *client_ctx;
- struct stream *work;
- int processed = 0;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx);
-
- if (client_ctx->conn_fd == 0)
- return;
-
- for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(client_ctx->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_be_client_process_msg(
- client_ctx, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != client_ctx->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
+ struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(client_ctx->ibuf_fifo))
- mgmt_be_client_register_event(client_ctx,
- MGMTD_BE_PROC_MSG);
+ if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg,
+ client_ctx, mgmt_debug_be_client))
+ mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
}
static void mgmt_be_client_read(struct thread *thread)
{
- struct mgmt_be_client_ctx *client_ctx;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_be_msg_hdr *msg_hdr;
- bool incomplete = false;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- total_bytes = 0;
- bytes_left = STREAM_SIZE(client_ctx->ibuf_work)
- - stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(client_ctx->ibuf_work,
- client_ctx->conn_fd, bytes_left);
- MGMTD_BE_CLIENT_DBG(
- "Got %d bytes of message from MGMTD Backend server",
- bytes_read);
- /* -2 is normal nothing read, and to retry */
- if (bytes_read == -2)
- break;
- if (bytes_read <= 0) {
- if (bytes_read == 0) {
- MGMTD_BE_CLIENT_ERR(
- "Got EOF/disconnect while reading from MGMTD Frontend server.");
- } else {
- MGMTD_BE_CLIENT_ERR(
- "Got error (%d) while reading from MGMTD Backend server. Err: '%s'",
- bytes_read, safe_strerror(errno));
- }
- mgmt_be_server_disconnect(client_ctx, true);
- return;
- }
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
- /*
- * Check if we have read complete messages or not.
- */
- stream_set_getp(client_ctx->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- msg_hdr = (struct mgmt_be_msg_hdr
- *)(STREAM_DATA(client_ctx->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_BE_CLIENT_ERR(
- "Received corrupted buffer from MGMTD backend server.");
- mgmt_be_server_disconnect(client_ctx, true);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
+ struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
+ enum mgmt_msg_rsched rv;
- if (!msg_cnt)
- goto resched;
-
- if (bytes_left > 0)
- incomplete = true;
-
- /*
- * We have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr =
- (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) +
- total_bytes);
- stream_set_endp(client_ctx->ibuf_work, total_bytes);
- stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work);
- client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(client_ctx->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_be_client);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_be_server_disconnect(client_ctx, true);
+ return;
}
- mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
-
-resched:
+ if (rv == MSR_SCHED_BOTH)
+ mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
}
@@ -1072,9 +934,7 @@ mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx)
{
MGMTD_BE_CLIENT_DBG("Resume writing msgs");
UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF);
- if (client_ctx->obuf_work
- || stream_fifo_count_safe(client_ctx->obuf_fifo))
- mgmt_be_client_sched_msg_write(client_ctx);
+ mgmt_be_client_sched_msg_write(client_ctx);
}
static inline void
@@ -1085,99 +945,39 @@ mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx)
}
static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx,
- Mgmtd__BeMessage *be_msg)
+ Mgmtd__BeMessage *be_msg)
{
- size_t msg_size;
- uint8_t *msg_buf = client_ctx->msg_buf;
- struct mgmt_be_msg *msg;
-
- if (client_ctx->conn_fd == 0)
- return -1;
-
- msg_size = mgmtd__be_message__get_packed_size(be_msg);
- msg_size += MGMTD_BE_MSG_HDR_LEN;
- if (msg_size > MGMTD_BE_MSG_MAX_LEN) {
- MGMTD_BE_CLIENT_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN);
+ if (client_ctx->conn_fd == -1) {
+ MGMTD_BE_CLIENT_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_be_msg *)msg_buf;
- msg->hdr.marker = MGMTD_BE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__be_message__pack(be_msg, msg->payload);
-
- if (!client_ctx->obuf_work)
- client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- }
- stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &client_ctx->mstate, be_msg,
+ mgmtd__be_message__get_packed_size(be_msg),
+ (size_t(*)(void *, void *))mgmtd__be_message__pack,
+ mgmt_debug_be_client);
mgmt_be_client_sched_msg_write(client_ctx);
- client_ctx->num_msg_tx++;
- return 0;
+ return rv;
}
static void mgmt_be_client_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
- struct mgmt_be_client_ctx *client_ctx;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (client_ctx->obuf_work) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(client_ctx->obuf_fifo);
- s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(client_ctx->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, client_ctx->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_be_client_register_event(
- client_ctx, MGMTD_BE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_BE_CLIENT_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_be_client_register_event(
- client_ctx, MGMTD_BE_CONN_WRITE);
- return;
- }
- mgmt_be_server_disconnect(client_ctx, true);
- return;
- }
-
- free = stream_fifo_pop(client_ctx->obuf_fifo);
- stream_free(free);
- MGMTD_BE_CLIENT_DBG(
- "Wrote %d bytes of message to MGMTD Backend client socket.'",
- bytes_written);
- processed++;
- }
-
- if (s) {
+ struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
+ enum mgmt_msg_wsched rv;
+
+ rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_be_client);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_be_server_disconnect(client_ctx, true);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_be_client_writes_off(client_ctx);
mgmt_be_client_register_event(client_ctx,
- MGMTD_BE_CONN_WRITES_ON);
- }
+ MGMTD_BE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_be_client_resume_writes(struct thread *thread)
@@ -1185,15 +985,14 @@ static void mgmt_be_client_resume_writes(struct thread *thread)
struct mgmt_be_client_ctx *client_ctx;
client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
+ assert(client_ctx && client_ctx->conn_fd != -1);
mgmt_be_client_writes_on(client_ctx);
}
static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
- bool subscr_xpaths,
- uint16_t num_reg_xpaths,
- char **reg_xpaths)
+ bool subscr_xpaths, uint16_t num_reg_xpaths,
+ char **reg_xpaths)
{
Mgmtd__BeMessage be_msg;
Mgmtd__BeSubscribeReq subscr_req;
@@ -1214,86 +1013,35 @@ static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
return mgmt_be_client_send_msg(client_ctx, &be_msg);
}
-static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
+static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
{
- int ret, sock, len;
- struct sockaddr_un addr;
-
- MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s",
- MGMTD_BE_SERVER_PATH);
-
- assert(!client_ctx->conn_fd);
+ const char *dbgtag = mgmt_debug_be_client ? "BE-client" : NULL;
- sock = socket(AF_UNIX, SOCK_STREAM, 0);
- if (sock < 0) {
- MGMTD_BE_CLIENT_ERR("Failed to create socket");
- goto mgmt_be_server_connect_failed;
- }
-
- MGMTD_BE_CLIENT_DBG(
- "Created MGMTD Backend server socket successfully!");
+ assert(client_ctx->conn_fd == -1);
+ client_ctx->conn_fd = mgmt_msg_connect(
+ MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE,
+ MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag);
- memset(&addr, 0, sizeof(struct sockaddr_un));
- addr.sun_family = AF_UNIX;
- strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path));
-#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
- len = addr.sun_len = SUN_LEN(&addr);
-#else
- len = sizeof(addr.sun_family) + strlen(addr.sun_path);
-#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
-
- ret = connect(sock, (struct sockaddr *)&addr, len);
- if (ret < 0) {
- MGMTD_BE_CLIENT_ERR(
- "Failed to connect to MGMTD Backend Server at %s. Err: %s",
- addr.sun_path, safe_strerror(errno));
- close(sock);
- goto mgmt_be_server_connect_failed;
+ /* Send SUBSCRIBE_REQ message */
+ if (client_ctx->conn_fd == -1 ||
+ mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) {
+ mgmt_be_server_disconnect(client_ctx, true);
+ return;
}
- MGMTD_BE_CLIENT_DBG(
- "Connected to MGMTD Backend Server at %s successfully!",
- addr.sun_path);
- client_ctx->conn_fd = sock;
-
- /* Make client socket non-blocking. */
- set_nonblocking(sock);
- setsockopt_so_sendbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_BE_SEND_BUF_SIZE);
- setsockopt_so_recvbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_BE_RECV_BUF_SIZE);
-
+ /* Start reading from the socket */
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
/* Notify client through registered callback (if any) */
if (client_ctx->client_params.client_connect_notify)
- (void)(*client_ctx->client_params
- .client_connect_notify)(
+ (void)(*client_ctx->client_params.client_connect_notify)(
(uintptr_t)client_ctx,
client_ctx->client_params.user_data, true);
-
- /* Send SUBSCRIBE_REQ message */
- if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0)
- goto mgmt_be_server_connect_failed;
-
- return 0;
-
-mgmt_be_server_connect_failed:
- if (sock && sock != client_ctx->conn_fd)
- close(sock);
-
- mgmt_be_server_disconnect(client_ctx, true);
- return -1;
}
static void mgmt_be_client_conn_timeout(struct thread *thread)
{
- struct mgmt_be_client_ctx *client_ctx;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx);
-
- mgmt_be_server_connect(client_ctx);
+ mgmt_be_server_connect(THREAD_ARG(thread));
}
static void
@@ -1317,16 +1065,15 @@ mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx,
break;
case MGMTD_BE_PROC_MSG:
tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC;
- thread_add_timer_tv(client_ctx->tm,
- mgmt_be_client_proc_msgbufs, client_ctx,
- &tv, &client_ctx->msg_proc_ev);
+ thread_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs,
+ client_ctx, &tv, &client_ctx->msg_proc_ev);
assert(client_ctx->msg_proc_ev);
break;
case MGMTD_BE_CONN_WRITES_ON:
- thread_add_timer_msec(
- client_ctx->tm, mgmt_be_client_resume_writes,
- client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC,
- &client_ctx->conn_writes_on);
+ thread_add_timer_msec(client_ctx->tm,
+ mgmt_be_client_resume_writes, client_ctx,
+ MGMTD_BE_MSG_WRITE_DELAY_MSEC,
+ &client_ctx->conn_writes_on);
assert(client_ctx->conn_writes_on);
break;
case MGMTD_BE_SERVER:
@@ -1376,16 +1123,10 @@ uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params,
mgmt_be_client_ctx.client_params.conn_retry_intvl_sec =
MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC;
- assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work &&
- !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work);
-
mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head);
- mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new();
- mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- mgmt_be_client_ctx.obuf_fifo = stream_fifo_new();
- /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- */
- mgmt_be_client_ctx.obuf_work = NULL;
+ mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
+ MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+ "BE-client");
/* Start trying to connect to MGMTD backend server immediately */
mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1);
@@ -1465,18 +1206,11 @@ void mgmt_be_client_lib_destroy(uintptr_t lib_hndl)
assert(client_ctx);
MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'",
- client_ctx->client_params.name);
+ client_ctx->client_params.name);
mgmt_be_server_disconnect(client_ctx, false);
- assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo);
-
- stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo);
- if (mgmt_be_client_ctx.ibuf_work)
- stream_free(mgmt_be_client_ctx.ibuf_work);
- stream_fifo_free(mgmt_be_client_ctx.obuf_fifo);
- if (mgmt_be_client_ctx.obuf_work)
- stream_free(mgmt_be_client_ctx.obuf_work);
+ mgmt_msg_destroy(&client_ctx->mstate);
THREAD_OFF(client_ctx->conn_retry_tmr);
THREAD_OFF(client_ctx->conn_read_ev);