summaryrefslogtreecommitdiff
path: root/lib/mgmt_fe_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mgmt_fe_client.c')
-rw-r--r--lib/mgmt_fe_client.c413
1 files changed, 74 insertions, 339 deletions
diff --git a/lib/mgmt_fe_client.c b/lib/mgmt_fe_client.c
index 22c3a06b9e..73154401ec 100644
--- a/lib/mgmt_fe_client.c
+++ b/lib/mgmt_fe_client.c
@@ -9,6 +9,7 @@
#include "memory.h"
#include "libfrr.h"
#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "network.h"
#include "stream.h"
@@ -55,13 +56,8 @@ struct mgmt_fe_client_ctx {
struct thread *conn_writes_on;
struct thread *msg_proc_ev;
uint32_t flags;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
- struct stream_fifo *ibuf_fifo;
- struct stream *ibuf_work;
- struct stream_fifo *obuf_fifo;
- struct stream *obuf_work;
+ struct mgmt_msg_state mstate;
struct mgmt_fe_client_params client_params;
@@ -75,7 +71,9 @@ struct mgmt_fe_client_ctx {
static bool mgmt_debug_fe_client;
-static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {0};
+static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {
+ .conn_fd = -1,
+};
/* Forward declarations */
static void
@@ -124,9 +122,9 @@ static void
mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx,
bool reconnect)
{
- if (client_ctx->conn_fd) {
+ if (client_ctx->conn_fd != -1) {
close(client_ctx->conn_fd);
- client_ctx->conn_fd = 0;
+ client_ctx->conn_fd = -1;
}
if (reconnect)
@@ -148,9 +146,7 @@ mgmt_fe_client_writes_on(struct mgmt_fe_client_ctx *client_ctx)
{
MGMTD_FE_CLIENT_DBG("Resume writing msgs");
UNSET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF);
- if (client_ctx->obuf_work
- || stream_fifo_count_safe(client_ctx->obuf_fifo))
- mgmt_fe_client_sched_msg_write(client_ctx);
+ mgmt_fe_client_sched_msg_write(client_ctx);
}
static inline void
@@ -160,101 +156,42 @@ mgmt_fe_client_writes_off(struct mgmt_fe_client_ctx *client_ctx)
MGMTD_FE_CLIENT_DBG("Paused writing msgs");
}
-static int
-mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
- Mgmtd__FeMessage *fe_msg)
+static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
+ Mgmtd__FeMessage *fe_msg)
{
- size_t msg_size;
- uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN];
- struct mgmt_fe_msg *msg;
-
- if (client_ctx->conn_fd == 0)
- return -1;
-
- msg_size = mgmtd__fe_message__get_packed_size(fe_msg);
- msg_size += MGMTD_FE_MSG_HDR_LEN;
- if (msg_size > sizeof(msg_buf)) {
- MGMTD_FE_CLIENT_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)sizeof(msg_buf));
+ /* users current expect this to fail here */
+ if (client_ctx->conn_fd == -1) {
+ MGMTD_FE_CLIENT_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_fe_msg *)msg_buf;
- msg->hdr.marker = MGMTD_FE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__fe_message__pack(fe_msg, msg->payload);
-
- if (!client_ctx->obuf_work)
- client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- }
- stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &client_ctx->mstate, fe_msg,
+ mgmtd__fe_message__get_packed_size(fe_msg),
+ (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+ mgmt_debug_fe_client);
mgmt_fe_client_sched_msg_write(client_ctx);
- client_ctx->num_msg_tx++;
- return 0;
+ return rv;
}
static void mgmt_fe_client_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
struct mgmt_fe_client_ctx *client_ctx;
+ enum mgmt_msg_wsched rv;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (client_ctx->obuf_work) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(client_ctx->obuf_fifo);
- s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(client_ctx->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, client_ctx->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_fe_client_register_event(
- client_ctx, MGMTD_FE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_FE_CLIENT_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_fe_client_register_event(
- client_ctx, MGMTD_FE_CONN_WRITE);
- return;
- }
- mgmt_fe_server_disconnect(client_ctx, true);
- return;
- }
-
- free = stream_fifo_pop(client_ctx->obuf_fifo);
- stream_free(free);
- MGMTD_FE_CLIENT_DBG(
- "Wrote %d bytes of message to MGMTD Backend client socket.'",
- bytes_written);
- processed++;
- }
-
- if (s) {
+ rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_fe_client);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_fe_server_disconnect(client_ctx, true);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_fe_client_writes_off(client_ctx);
mgmt_fe_client_register_event(client_ctx,
- MGMTD_FE_CONN_WRITES_ON);
- }
+ MGMTD_FE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_fe_client_resume_writes(struct thread *thread)
@@ -262,7 +199,7 @@ static void mgmt_fe_client_resume_writes(struct thread *thread)
struct mgmt_fe_client_ctx *client_ctx;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
+ assert(client_ctx && client_ctx->conn_fd != -1);
mgmt_fe_client_writes_on(client_ctx);
}
@@ -713,271 +650,83 @@ mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx,
return 0;
}
-static int
-mgmt_fe_client_process_msg(struct mgmt_fe_client_ctx *client_ctx,
- uint8_t *msg_buf, int bytes_read)
+static void mgmt_fe_client_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_fe_client_ctx *client_ctx = user_ctx;
Mgmtd__FeMessage *fe_msg;
- struct mgmt_fe_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
- MGMTD_FE_CLIENT_DBG(
- "Have %u bytes of messages from MGMTD Frontend server to .",
- bytes_read);
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_fe_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) {
- MGMTD_FE_CLIENT_DBG(
- "Marker not found in message from MGMTD Frontend server.");
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_FE_CLIENT_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend server.",
- bytes_left, msg->hdr.len);
- break;
- }
-
- fe_msg = mgmtd__fe_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN),
- msg->payload);
- if (!fe_msg) {
- MGMTD_FE_CLIENT_DBG(
- "Failed to decode %d bytes from MGMTD Frontend server.",
- msg->hdr.len);
- continue;
- }
-
- MGMTD_FE_CLIENT_DBG(
- "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend server",
- msg->hdr.len, fe_msg->message_case,
- fe_msg->message_case);
-
- (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg);
-
- mgmtd__fe_message__free_unpacked(fe_msg, NULL);
- processed++;
- client_ctx->num_msg_rx++;
+ fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
+ if (!fe_msg) {
+ MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.",
+ len);
+ return;
}
-
- return processed;
+ MGMTD_FE_CLIENT_DBG(
+ "Decoded %zu bytes of message(msg: %u/%u) from server", len,
+ fe_msg->message_case, fe_msg->message_case);
+ (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg);
+ mgmtd__fe_message__free_unpacked(fe_msg, NULL);
}
static void mgmt_fe_client_proc_msgbufs(struct thread *thread)
{
struct mgmt_fe_client_ctx *client_ctx;
- struct stream *work;
- int processed = 0;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(client_ctx->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_fe_client_process_msg(
- client_ctx, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != client_ctx->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
-
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(client_ctx->ibuf_fifo))
- mgmt_fe_client_register_event(client_ctx,
- MGMTD_FE_PROC_MSG);
+ if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg,
+ client_ctx, mgmt_debug_fe_client))
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
}
static void mgmt_fe_client_read(struct thread *thread)
{
struct mgmt_fe_client_ctx *client_ctx;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_fe_msg_hdr *msg_hdr;
- bool incomplete = false;
+ enum mgmt_msg_rsched rv;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- total_bytes = 0;
- bytes_left = STREAM_SIZE(client_ctx->ibuf_work)
- - stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(client_ctx->ibuf_work,
- client_ctx->conn_fd, bytes_left);
- MGMTD_FE_CLIENT_DBG(
- "Got %d bytes of message from MGMTD Frontend server",
- bytes_read);
- /* -2 is normal nothing read, and to retry */
- if (bytes_read == -2)
- break;
- if (bytes_read <= 0) {
- if (bytes_read == 0) {
- MGMTD_FE_CLIENT_ERR(
- "Got EOF/disconnect while reading from MGMTD Frontend server.");
- } else {
- /* Fatal error */
- MGMTD_FE_CLIENT_ERR(
- "Got error (%d) while reading from MGMTD Frontend server. Err: '%s'",
- bytes_read, safe_strerror(errno));
- }
- mgmt_fe_server_disconnect(client_ctx, true);
- return;
- }
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
-
- /*
- * Check if we have read complete messages or not.
- */
- stream_set_getp(client_ctx->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- msg_hdr = (struct mgmt_fe_msg_hdr
- *)(STREAM_DATA(client_ctx->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_FE_CLIENT_ERR(
- "Received corrupted buffer from MGMTD frontend server.");
- mgmt_fe_server_disconnect(client_ctx, true);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
-
- if (!msg_cnt)
- goto resched;
-
- if (bytes_left > 0)
- incomplete = true;
- /*
- * We have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr =
- (struct mgmt_fe_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work)
- + total_bytes);
- stream_set_endp(client_ctx->ibuf_work, total_bytes);
- stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work);
- client_ctx->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(client_ctx->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_fe_client);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_fe_server_disconnect(client_ctx, true);
+ return;
}
-
- mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
-
-resched:
+ if (rv == MSR_SCHED_BOTH)
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
}
-static int mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
+static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
{
- int ret, sock, len;
- struct sockaddr_un addr;
-
- MGMTD_FE_CLIENT_DBG(
- "Trying to connect to MGMTD Frontend server at %s",
- MGMTD_FE_SERVER_PATH);
-
- assert(!client_ctx->conn_fd);
+ const char *dbgtag = mgmt_debug_fe_client ? "FE-client" : NULL;
- sock = socket(AF_UNIX, SOCK_STREAM, 0);
- if (sock < 0) {
- MGMTD_FE_CLIENT_ERR("Failed to create socket");
- goto mgmt_fe_server_connect_failed;
- }
+ assert(client_ctx->conn_fd == -1);
+ client_ctx->conn_fd = mgmt_msg_connect(
+ MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE,
+ MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag);
- MGMTD_FE_CLIENT_DBG(
- "Created MGMTD Frontend server socket successfully!");
-
- memset(&addr, 0, sizeof(struct sockaddr_un));
- addr.sun_family = AF_UNIX;
- strlcpy(addr.sun_path, MGMTD_FE_SERVER_PATH, sizeof(addr.sun_path));
-#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
- len = addr.sun_len = SUN_LEN(&addr);
-#else
- len = sizeof(addr.sun_family) + strlen(addr.sun_path);
-#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
-
- ret = connect(sock, (struct sockaddr *)&addr, len);
- if (ret < 0) {
- MGMTD_FE_CLIENT_ERR(
- "Failed to connect to MGMTD Frontend Server at %s. Err: %s",
- addr.sun_path, safe_strerror(errno));
- close(sock);
- goto mgmt_fe_server_connect_failed;
+ /* Send REGISTER_REQ message */
+ if (client_ctx->conn_fd == -1 ||
+ mgmt_fe_send_register_req(client_ctx) != 0) {
+ mgmt_fe_server_disconnect(client_ctx, true);
+ return;
}
- MGMTD_FE_CLIENT_DBG(
- "Connected to MGMTD Frontend Server at %s successfully!",
- addr.sun_path);
- client_ctx->conn_fd = sock;
-
- /* Make client socket non-blocking. */
- set_nonblocking(sock);
- setsockopt_so_sendbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_FE_SEND_BUF_SIZE);
- setsockopt_so_recvbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_FE_RECV_BUF_SIZE);
-
- thread_add_read(client_ctx->tm, mgmt_fe_client_read,
- (void *)&mgmt_fe_client_ctx, client_ctx->conn_fd,
- &client_ctx->conn_read_ev);
- assert(client_ctx->conn_read_ev);
-
- /* Send REGISTER_REQ message */
- if (mgmt_fe_send_register_req(client_ctx) != 0)
- goto mgmt_fe_server_connect_failed;
+ /* Start reading from the socket */
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
/* Notify client through registered callback (if any) */
if (client_ctx->client_params.client_connect_notify)
(void)(*client_ctx->client_params.client_connect_notify)(
(uintptr_t)client_ctx,
client_ctx->client_params.user_data, true);
-
- return 0;
-
-mgmt_fe_server_connect_failed:
- if (sock && sock != client_ctx->conn_fd)
- close(sock);
-
- mgmt_fe_server_disconnect(client_ctx, true);
- return -1;
}
+
static void mgmt_fe_client_conn_timeout(struct thread *thread)
{
- struct mgmt_fe_client_ctx *client_ctx;
-
- client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx);
-
- mgmt_fe_server_connect(client_ctx);
+ mgmt_fe_server_connect(THREAD_ARG(thread));
}
static void
@@ -1046,16 +795,9 @@ uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params,
mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec =
MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC;
- assert(!mgmt_fe_client_ctx.ibuf_fifo
- && !mgmt_fe_client_ctx.ibuf_work
- && !mgmt_fe_client_ctx.obuf_fifo
- && !mgmt_fe_client_ctx.obuf_work);
-
- mgmt_fe_client_ctx.ibuf_fifo = stream_fifo_new();
- mgmt_fe_client_ctx.ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- mgmt_fe_client_ctx.obuf_fifo = stream_fifo_new();
-
- mgmt_fe_client_ctx.obuf_work = NULL;
+ mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
+ MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+ "FE-client");
mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions);
@@ -1322,8 +1064,6 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl)
mgmt_fe_server_disconnect(client_ctx, false);
- assert(mgmt_fe_client_ctx.ibuf_fifo && mgmt_fe_client_ctx.obuf_fifo);
-
mgmt_fe_destroy_client_sessions(lib_hndl);
THREAD_OFF(client_ctx->conn_retry_tmr);
@@ -1331,10 +1071,5 @@ void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl)
THREAD_OFF(client_ctx->conn_write_ev);
THREAD_OFF(client_ctx->conn_writes_on);
THREAD_OFF(client_ctx->msg_proc_ev);
- stream_fifo_free(mgmt_fe_client_ctx.ibuf_fifo);
- if (mgmt_fe_client_ctx.ibuf_work)
- stream_free(mgmt_fe_client_ctx.ibuf_work);
- stream_fifo_free(mgmt_fe_client_ctx.obuf_fifo);
- if (mgmt_fe_client_ctx.obuf_work)
- stream_free(mgmt_fe_client_ctx.obuf_work);
+ mgmt_msg_destroy(&client_ctx->mstate);
}