#include "libfrr.h"
#include "mgmtd/mgmt.h"
#include "mgmt_be_client.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "network.h"
#include "stream.h"
struct thread *conn_writes_on;
struct thread *msg_proc_ev;
uint32_t flags;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
- struct stream_fifo *ibuf_fifo;
- struct stream *ibuf_work;
- struct stream_fifo *obuf_fifo;
- struct stream *obuf_work;
- uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN];
+ struct mgmt_msg_state mstate;
struct nb_config *candidate_config;
struct nb_config *running_config;
static bool mgmt_debug_be_client;
-static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0};
+static struct mgmt_be_client_ctx mgmt_be_client_ctx = {
+ .conn_fd = -1,
+};
const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = {
#ifdef HAVE_STATICD
{
/* Notify client through registered callback (if any) */
if (client_ctx->client_params.client_connect_notify)
- (void)(*client_ctx->client_params
- .client_connect_notify)(
+ (void)(*client_ctx->client_params.client_connect_notify)(
(uintptr_t)client_ctx,
client_ctx->client_params.user_data, false);
- if (client_ctx->conn_fd) {
+ if (client_ctx->conn_fd != -1) {
close(client_ctx->conn_fd);
- client_ctx->conn_fd = 0;
+ client_ctx->conn_fd = -1;
}
if (reconnect)
return 0;
}
-static int
-mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx,
- uint8_t *msg_buf, int bytes_read)
+static void mgmt_be_client_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_be_client_ctx *client_ctx = user_ctx;
Mgmtd__BeMessage *be_msg;
- struct mgmt_be_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
-
- MGMTD_BE_CLIENT_DBG(
- "Got message of %d bytes from MGMTD Backend Server",
- bytes_read);
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_be_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) {
- MGMTD_BE_CLIENT_DBG(
- "Marker not found in message from MGMTD '%s'",
- client_ctx->client_params.name);
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_BE_CLIENT_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'",
- bytes_left, msg->hdr.len,
- client_ctx->client_params.name);
- break;
- }
-
- be_msg = mgmtd__be_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN),
- msg->payload);
- if (!be_msg) {
- MGMTD_BE_CLIENT_DBG(
- "Failed to decode %d bytes from MGMTD '%s'",
- msg->hdr.len, client_ctx->client_params.name);
- continue;
- }
- (void)mgmt_be_client_handle_msg(client_ctx, be_msg);
- mgmtd__be_message__free_unpacked(be_msg, NULL);
- processed++;
- client_ctx->num_msg_rx++;
+ be_msg = mgmtd__be_message__unpack(NULL, len, data);
+ if (!be_msg) {
+ MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server",
+ len);
+ return;
}
-
- return processed;
+ MGMTD_BE_CLIENT_DBG(
+ "Decoded %zu bytes of message(msg: %u/%u) from server", len,
+ be_msg->message_case, be_msg->message_case);
+ (void)mgmt_be_client_handle_msg(client_ctx, be_msg);
+ mgmtd__be_message__free_unpacked(be_msg, NULL);
}
static void mgmt_be_client_proc_msgbufs(struct thread *thread)
{
- struct mgmt_be_client_ctx *client_ctx;
- struct stream *work;
- int processed = 0;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx);
-
- if (client_ctx->conn_fd == 0)
- return;
-
- for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(client_ctx->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_be_client_process_msg(
- client_ctx, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != client_ctx->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
+ struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(client_ctx->ibuf_fifo))
- mgmt_be_client_register_event(client_ctx,
- MGMTD_BE_PROC_MSG);
+ if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg,
+ client_ctx, mgmt_debug_be_client))
+ mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
}
static void mgmt_be_client_read(struct thread *thread)
{
- struct mgmt_be_client_ctx *client_ctx;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_be_msg_hdr *msg_hdr;
- bool incomplete = false;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- total_bytes = 0;
- bytes_left = STREAM_SIZE(client_ctx->ibuf_work)
- - stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(client_ctx->ibuf_work,
- client_ctx->conn_fd, bytes_left);
- MGMTD_BE_CLIENT_DBG(
- "Got %d bytes of message from MGMTD Backend server",
- bytes_read);
- /* -2 is normal nothing read, and to retry */
- if (bytes_read == -2)
- break;
- if (bytes_read <= 0) {
- if (bytes_read == 0) {
- MGMTD_BE_CLIENT_ERR(
- "Got EOF/disconnect while reading from MGMTD Frontend server.");
- } else {
- MGMTD_BE_CLIENT_ERR(
- "Got error (%d) while reading from MGMTD Backend server. Err: '%s'",
- bytes_read, safe_strerror(errno));
- }
- mgmt_be_server_disconnect(client_ctx, true);
- return;
- }
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
- /*
- * Check if we have read complete messages or not.
- */
- stream_set_getp(client_ctx->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- msg_hdr = (struct mgmt_be_msg_hdr
- *)(STREAM_DATA(client_ctx->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_BE_CLIENT_ERR(
- "Received corrupted buffer from MGMTD backend server.");
- mgmt_be_server_disconnect(client_ctx, true);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
+ struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
+ enum mgmt_msg_rsched rv;
- if (!msg_cnt)
- goto resched;
-
- if (bytes_left > 0)
- incomplete = true;
-
- /*
- * We have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr =
- (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) +
- total_bytes);
- stream_set_endp(client_ctx->ibuf_work, total_bytes);
- stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work);
- client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(client_ctx->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_be_client);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_be_server_disconnect(client_ctx, true);
+ return;
}
- mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
-
-resched:
+ if (rv == MSR_SCHED_BOTH)
+ mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
}
{
MGMTD_BE_CLIENT_DBG("Resume writing msgs");
UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF);
- if (client_ctx->obuf_work
- || stream_fifo_count_safe(client_ctx->obuf_fifo))
- mgmt_be_client_sched_msg_write(client_ctx);
+ mgmt_be_client_sched_msg_write(client_ctx);
}
static inline void
}
static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx,
- Mgmtd__BeMessage *be_msg)
+ Mgmtd__BeMessage *be_msg)
{
- size_t msg_size;
- uint8_t *msg_buf = client_ctx->msg_buf;
- struct mgmt_be_msg *msg;
-
- if (client_ctx->conn_fd == 0)
- return -1;
-
- msg_size = mgmtd__be_message__get_packed_size(be_msg);
- msg_size += MGMTD_BE_MSG_HDR_LEN;
- if (msg_size > MGMTD_BE_MSG_MAX_LEN) {
- MGMTD_BE_CLIENT_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN);
+ if (client_ctx->conn_fd == -1) {
+ MGMTD_BE_CLIENT_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_be_msg *)msg_buf;
- msg->hdr.marker = MGMTD_BE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__be_message__pack(be_msg, msg->payload);
-
- if (!client_ctx->obuf_work)
- client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- }
- stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &client_ctx->mstate, be_msg,
+ mgmtd__be_message__get_packed_size(be_msg),
+ (size_t(*)(void *, void *))mgmtd__be_message__pack,
+ mgmt_debug_be_client);
mgmt_be_client_sched_msg_write(client_ctx);
- client_ctx->num_msg_tx++;
- return 0;
+ return rv;
}
static void mgmt_be_client_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
- struct mgmt_be_client_ctx *client_ctx;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (client_ctx->obuf_work) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(client_ctx->obuf_fifo);
- s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(client_ctx->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, client_ctx->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_be_client_register_event(
- client_ctx, MGMTD_BE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_BE_CLIENT_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_be_client_register_event(
- client_ctx, MGMTD_BE_CONN_WRITE);
- return;
- }
- mgmt_be_server_disconnect(client_ctx, true);
- return;
- }
-
- free = stream_fifo_pop(client_ctx->obuf_fifo);
- stream_free(free);
- MGMTD_BE_CLIENT_DBG(
- "Wrote %d bytes of message to MGMTD Backend client socket.'",
- bytes_written);
- processed++;
- }
-
- if (s) {
+ struct mgmt_be_client_ctx *client_ctx = THREAD_ARG(thread);
+ enum mgmt_msg_wsched rv;
+
+ rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_be_client);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_be_server_disconnect(client_ctx, true);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_be_client_writes_off(client_ctx);
mgmt_be_client_register_event(client_ctx,
- MGMTD_BE_CONN_WRITES_ON);
- }
+ MGMTD_BE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_be_client_resume_writes(struct thread *thread)
struct mgmt_be_client_ctx *client_ctx;
client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
+ assert(client_ctx && client_ctx->conn_fd != -1);
mgmt_be_client_writes_on(client_ctx);
}
static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
- bool subscr_xpaths,
- uint16_t num_reg_xpaths,
- char **reg_xpaths)
+ bool subscr_xpaths, uint16_t num_reg_xpaths,
+ char **reg_xpaths)
{
Mgmtd__BeMessage be_msg;
Mgmtd__BeSubscribeReq subscr_req;
return mgmt_be_client_send_msg(client_ctx, &be_msg);
}
-static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
+static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
{
- int ret, sock, len;
- struct sockaddr_un addr;
-
- MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s",
- MGMTD_BE_SERVER_PATH);
-
- assert(!client_ctx->conn_fd);
+ const char *dbgtag = mgmt_debug_be_client ? "BE-client" : NULL;
- sock = socket(AF_UNIX, SOCK_STREAM, 0);
- if (sock < 0) {
- MGMTD_BE_CLIENT_ERR("Failed to create socket");
- goto mgmt_be_server_connect_failed;
- }
-
- MGMTD_BE_CLIENT_DBG(
- "Created MGMTD Backend server socket successfully!");
+ assert(client_ctx->conn_fd == -1);
+ client_ctx->conn_fd = mgmt_msg_connect(
+ MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE,
+ MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag);
- memset(&addr, 0, sizeof(struct sockaddr_un));
- addr.sun_family = AF_UNIX;
- strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path));
-#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
- len = addr.sun_len = SUN_LEN(&addr);
-#else
- len = sizeof(addr.sun_family) + strlen(addr.sun_path);
-#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
-
- ret = connect(sock, (struct sockaddr *)&addr, len);
- if (ret < 0) {
- MGMTD_BE_CLIENT_ERR(
- "Failed to connect to MGMTD Backend Server at %s. Err: %s",
- addr.sun_path, safe_strerror(errno));
- close(sock);
- goto mgmt_be_server_connect_failed;
+ /* Send SUBSCRIBE_REQ message */
+ if (client_ctx->conn_fd == -1 ||
+ mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) {
+ mgmt_be_server_disconnect(client_ctx, true);
+ return;
}
- MGMTD_BE_CLIENT_DBG(
- "Connected to MGMTD Backend Server at %s successfully!",
- addr.sun_path);
- client_ctx->conn_fd = sock;
-
- /* Make client socket non-blocking. */
- set_nonblocking(sock);
- setsockopt_so_sendbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_BE_SEND_BUF_SIZE);
- setsockopt_so_recvbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_BE_RECV_BUF_SIZE);
-
+ /* Start reading from the socket */
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
/* Notify client through registered callback (if any) */
if (client_ctx->client_params.client_connect_notify)
- (void)(*client_ctx->client_params
- .client_connect_notify)(
+ (void)(*client_ctx->client_params.client_connect_notify)(
(uintptr_t)client_ctx,
client_ctx->client_params.user_data, true);
-
- /* Send SUBSCRIBE_REQ message */
- if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0)
- goto mgmt_be_server_connect_failed;
-
- return 0;
-
-mgmt_be_server_connect_failed:
- if (sock && sock != client_ctx->conn_fd)
- close(sock);
-
- mgmt_be_server_disconnect(client_ctx, true);
- return -1;
}
static void mgmt_be_client_conn_timeout(struct thread *thread)
{
- struct mgmt_be_client_ctx *client_ctx;
-
- client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx);
-
- mgmt_be_server_connect(client_ctx);
+ mgmt_be_server_connect(THREAD_ARG(thread));
}
static void
break;
case MGMTD_BE_PROC_MSG:
tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC;
- thread_add_timer_tv(client_ctx->tm,
- mgmt_be_client_proc_msgbufs, client_ctx,
- &tv, &client_ctx->msg_proc_ev);
+ thread_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs,
+ client_ctx, &tv, &client_ctx->msg_proc_ev);
assert(client_ctx->msg_proc_ev);
break;
case MGMTD_BE_CONN_WRITES_ON:
- thread_add_timer_msec(
- client_ctx->tm, mgmt_be_client_resume_writes,
- client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC,
- &client_ctx->conn_writes_on);
+ thread_add_timer_msec(client_ctx->tm,
+ mgmt_be_client_resume_writes, client_ctx,
+ MGMTD_BE_MSG_WRITE_DELAY_MSEC,
+ &client_ctx->conn_writes_on);
assert(client_ctx->conn_writes_on);
break;
case MGMTD_BE_SERVER:
mgmt_be_client_ctx.client_params.conn_retry_intvl_sec =
MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC;
- assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work &&
- !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work);
-
mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head);
- mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new();
- mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- mgmt_be_client_ctx.obuf_fifo = stream_fifo_new();
- /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- */
- mgmt_be_client_ctx.obuf_work = NULL;
+ mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
+ MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+ "BE-client");
/* Start trying to connect to MGMTD backend server immediately */
mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1);
assert(client_ctx);
MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'",
- client_ctx->client_params.name);
+ client_ctx->client_params.name);
mgmt_be_server_disconnect(client_ctx, false);
- assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo);
-
- stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo);
- if (mgmt_be_client_ctx.ibuf_work)
- stream_free(mgmt_be_client_ctx.ibuf_work);
- stream_fifo_free(mgmt_be_client_ctx.obuf_fifo);
- if (mgmt_be_client_ctx.obuf_work)
- stream_free(mgmt_be_client_ctx.obuf_work);
+ mgmt_msg_destroy(&client_ctx->mstate);
THREAD_OFF(client_ctx->conn_retry_tmr);
THREAD_OFF(client_ctx->conn_read_ev);
extern "C" {
#endif
+#include "northbound.h"
+#include "mgmt_pb.h"
#include "mgmtd/mgmt_defines.h"
/***************************************************************
#define MGMTD_BE_MAX_CLIENTS_PER_XPATH_REG 32
-struct mgmt_be_msg_hdr {
- uint16_t marker;
- uint16_t len; /* Includes header */
-};
-#define MGMTD_BE_MSG_HDR_LEN sizeof(struct mgmt_be_msg_hdr)
-#define MGMTD_BE_MSG_MARKER 0xfeed
-
-struct mgmt_be_msg {
- struct mgmt_be_msg_hdr hdr;
- uint8_t payload[];
-};
-
struct mgmt_be_client_txn_ctx {
uintptr_t *user_ctx;
};
#include "memory.h"
#include "libfrr.h"
#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "network.h"
#include "stream.h"
struct thread *conn_writes_on;
struct thread *msg_proc_ev;
uint32_t flags;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
- struct stream_fifo *ibuf_fifo;
- struct stream *ibuf_work;
- struct stream_fifo *obuf_fifo;
- struct stream *obuf_work;
+ struct mgmt_msg_state mstate;
struct mgmt_fe_client_params client_params;
static bool mgmt_debug_fe_client;
-static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {0};
+static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {
+ .conn_fd = -1,
+};
/* Forward declarations */
static void
mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx,
bool reconnect)
{
- if (client_ctx->conn_fd) {
+ if (client_ctx->conn_fd != -1) {
close(client_ctx->conn_fd);
- client_ctx->conn_fd = 0;
+ client_ctx->conn_fd = -1;
}
if (reconnect)
{
MGMTD_FE_CLIENT_DBG("Resume writing msgs");
UNSET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF);
- if (client_ctx->obuf_work
- || stream_fifo_count_safe(client_ctx->obuf_fifo))
- mgmt_fe_client_sched_msg_write(client_ctx);
+ mgmt_fe_client_sched_msg_write(client_ctx);
}
static inline void
MGMTD_FE_CLIENT_DBG("Paused writing msgs");
}
-static int
-mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
- Mgmtd__FeMessage *fe_msg)
+static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
+ Mgmtd__FeMessage *fe_msg)
{
- size_t msg_size;
- uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN];
- struct mgmt_fe_msg *msg;
-
- if (client_ctx->conn_fd == 0)
- return -1;
-
- msg_size = mgmtd__fe_message__get_packed_size(fe_msg);
- msg_size += MGMTD_FE_MSG_HDR_LEN;
- if (msg_size > sizeof(msg_buf)) {
- MGMTD_FE_CLIENT_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)sizeof(msg_buf));
+ /* users current expect this to fail here */
+ if (client_ctx->conn_fd == -1) {
+ MGMTD_FE_CLIENT_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_fe_msg *)msg_buf;
- msg->hdr.marker = MGMTD_FE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__fe_message__pack(fe_msg, msg->payload);
-
- if (!client_ctx->obuf_work)
- client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- }
- stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &client_ctx->mstate, fe_msg,
+ mgmtd__fe_message__get_packed_size(fe_msg),
+ (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+ mgmt_debug_fe_client);
mgmt_fe_client_sched_msg_write(client_ctx);
- client_ctx->num_msg_tx++;
- return 0;
+ return rv;
}
static void mgmt_fe_client_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
struct mgmt_fe_client_ctx *client_ctx;
+ enum mgmt_msg_wsched rv;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (client_ctx->obuf_work) {
- stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work);
- client_ctx->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(client_ctx->obuf_fifo);
- s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(client_ctx->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, client_ctx->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_fe_client_register_event(
- client_ctx, MGMTD_FE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_FE_CLIENT_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_fe_client_register_event(
- client_ctx, MGMTD_FE_CONN_WRITE);
- return;
- }
- mgmt_fe_server_disconnect(client_ctx, true);
- return;
- }
-
- free = stream_fifo_pop(client_ctx->obuf_fifo);
- stream_free(free);
- MGMTD_FE_CLIENT_DBG(
- "Wrote %d bytes of message to MGMTD Backend client socket.'",
- bytes_written);
- processed++;
- }
-
- if (s) {
+ rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_fe_client);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_fe_server_disconnect(client_ctx, true);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_fe_client_writes_off(client_ctx);
mgmt_fe_client_register_event(client_ctx,
- MGMTD_FE_CONN_WRITES_ON);
- }
+ MGMTD_FE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_fe_client_resume_writes(struct thread *thread)
struct mgmt_fe_client_ctx *client_ctx;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
+ assert(client_ctx && client_ctx->conn_fd != -1);
mgmt_fe_client_writes_on(client_ctx);
}
return 0;
}
-static int
-mgmt_fe_client_process_msg(struct mgmt_fe_client_ctx *client_ctx,
- uint8_t *msg_buf, int bytes_read)
+static void mgmt_fe_client_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_fe_client_ctx *client_ctx = user_ctx;
Mgmtd__FeMessage *fe_msg;
- struct mgmt_fe_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
- MGMTD_FE_CLIENT_DBG(
- "Have %u bytes of messages from MGMTD Frontend server to .",
- bytes_read);
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_fe_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) {
- MGMTD_FE_CLIENT_DBG(
- "Marker not found in message from MGMTD Frontend server.");
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_FE_CLIENT_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend server.",
- bytes_left, msg->hdr.len);
- break;
- }
-
- fe_msg = mgmtd__fe_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN),
- msg->payload);
- if (!fe_msg) {
- MGMTD_FE_CLIENT_DBG(
- "Failed to decode %d bytes from MGMTD Frontend server.",
- msg->hdr.len);
- continue;
- }
-
- MGMTD_FE_CLIENT_DBG(
- "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend server",
- msg->hdr.len, fe_msg->message_case,
- fe_msg->message_case);
-
- (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg);
-
- mgmtd__fe_message__free_unpacked(fe_msg, NULL);
- processed++;
- client_ctx->num_msg_rx++;
+ fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
+ if (!fe_msg) {
+ MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.",
+ len);
+ return;
}
-
- return processed;
+ MGMTD_FE_CLIENT_DBG(
+ "Decoded %zu bytes of message(msg: %u/%u) from server", len,
+ fe_msg->message_case, fe_msg->message_case);
+ (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg);
+ mgmtd__fe_message__free_unpacked(fe_msg, NULL);
}
static void mgmt_fe_client_proc_msgbufs(struct thread *thread)
{
struct mgmt_fe_client_ctx *client_ctx;
- struct stream *work;
- int processed = 0;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(client_ctx->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_fe_client_process_msg(
- client_ctx, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != client_ctx->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
-
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(client_ctx->ibuf_fifo))
- mgmt_fe_client_register_event(client_ctx,
- MGMTD_FE_PROC_MSG);
+ if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg,
+ client_ctx, mgmt_debug_fe_client))
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
}
static void mgmt_fe_client_read(struct thread *thread)
{
struct mgmt_fe_client_ctx *client_ctx;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_fe_msg_hdr *msg_hdr;
- bool incomplete = false;
+ enum mgmt_msg_rsched rv;
client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx && client_ctx->conn_fd);
-
- total_bytes = 0;
- bytes_left = STREAM_SIZE(client_ctx->ibuf_work)
- - stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(client_ctx->ibuf_work,
- client_ctx->conn_fd, bytes_left);
- MGMTD_FE_CLIENT_DBG(
- "Got %d bytes of message from MGMTD Frontend server",
- bytes_read);
- /* -2 is normal nothing read, and to retry */
- if (bytes_read == -2)
- break;
- if (bytes_read <= 0) {
- if (bytes_read == 0) {
- MGMTD_FE_CLIENT_ERR(
- "Got EOF/disconnect while reading from MGMTD Frontend server.");
- } else {
- /* Fatal error */
- MGMTD_FE_CLIENT_ERR(
- "Got error (%d) while reading from MGMTD Frontend server. Err: '%s'",
- bytes_read, safe_strerror(errno));
- }
- mgmt_fe_server_disconnect(client_ctx, true);
- return;
- }
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
-
- /*
- * Check if we have read complete messages or not.
- */
- stream_set_getp(client_ctx->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(client_ctx->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- msg_hdr = (struct mgmt_fe_msg_hdr
- *)(STREAM_DATA(client_ctx->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_FE_CLIENT_ERR(
- "Received corrupted buffer from MGMTD frontend server.");
- mgmt_fe_server_disconnect(client_ctx, true);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
-
- if (!msg_cnt)
- goto resched;
-
- if (bytes_left > 0)
- incomplete = true;
- /*
- * We have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr =
- (struct mgmt_fe_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work)
- + total_bytes);
- stream_set_endp(client_ctx->ibuf_work, total_bytes);
- stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work);
- client_ctx->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(client_ctx->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
+ mgmt_debug_fe_client);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_fe_server_disconnect(client_ctx, true);
+ return;
}
-
- mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
-
-resched:
+ if (rv == MSR_SCHED_BOTH)
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
}
-static int mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
+static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
{
- int ret, sock, len;
- struct sockaddr_un addr;
-
- MGMTD_FE_CLIENT_DBG(
- "Trying to connect to MGMTD Frontend server at %s",
- MGMTD_FE_SERVER_PATH);
-
- assert(!client_ctx->conn_fd);
+ const char *dbgtag = mgmt_debug_fe_client ? "FE-client" : NULL;
- sock = socket(AF_UNIX, SOCK_STREAM, 0);
- if (sock < 0) {
- MGMTD_FE_CLIENT_ERR("Failed to create socket");
- goto mgmt_fe_server_connect_failed;
- }
+ assert(client_ctx->conn_fd == -1);
+ client_ctx->conn_fd = mgmt_msg_connect(
+ MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE,
+ MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag);
- MGMTD_FE_CLIENT_DBG(
- "Created MGMTD Frontend server socket successfully!");
-
- memset(&addr, 0, sizeof(struct sockaddr_un));
- addr.sun_family = AF_UNIX;
- strlcpy(addr.sun_path, MGMTD_FE_SERVER_PATH, sizeof(addr.sun_path));
-#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
- len = addr.sun_len = SUN_LEN(&addr);
-#else
- len = sizeof(addr.sun_family) + strlen(addr.sun_path);
-#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
-
- ret = connect(sock, (struct sockaddr *)&addr, len);
- if (ret < 0) {
- MGMTD_FE_CLIENT_ERR(
- "Failed to connect to MGMTD Frontend Server at %s. Err: %s",
- addr.sun_path, safe_strerror(errno));
- close(sock);
- goto mgmt_fe_server_connect_failed;
+ /* Send REGISTER_REQ message */
+ if (client_ctx->conn_fd == -1 ||
+ mgmt_fe_send_register_req(client_ctx) != 0) {
+ mgmt_fe_server_disconnect(client_ctx, true);
+ return;
}
- MGMTD_FE_CLIENT_DBG(
- "Connected to MGMTD Frontend Server at %s successfully!",
- addr.sun_path);
- client_ctx->conn_fd = sock;
-
- /* Make client socket non-blocking. */
- set_nonblocking(sock);
- setsockopt_so_sendbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_FE_SEND_BUF_SIZE);
- setsockopt_so_recvbuf(client_ctx->conn_fd,
- MGMTD_SOCKET_FE_RECV_BUF_SIZE);
-
- thread_add_read(client_ctx->tm, mgmt_fe_client_read,
- (void *)&mgmt_fe_client_ctx, client_ctx->conn_fd,
- &client_ctx->conn_read_ev);
- assert(client_ctx->conn_read_ev);
-
- /* Send REGISTER_REQ message */
- if (mgmt_fe_send_register_req(client_ctx) != 0)
- goto mgmt_fe_server_connect_failed;
+ /* Start reading from the socket */
+ mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
/* Notify client through registered callback (if any) */
if (client_ctx->client_params.client_connect_notify)
(void)(*client_ctx->client_params.client_connect_notify)(
(uintptr_t)client_ctx,
client_ctx->client_params.user_data, true);
-
- return 0;
-
-mgmt_fe_server_connect_failed:
- if (sock && sock != client_ctx->conn_fd)
- close(sock);
-
- mgmt_fe_server_disconnect(client_ctx, true);
- return -1;
}
+
static void mgmt_fe_client_conn_timeout(struct thread *thread)
{
- struct mgmt_fe_client_ctx *client_ctx;
-
- client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread);
- assert(client_ctx);
-
- mgmt_fe_server_connect(client_ctx);
+ mgmt_fe_server_connect(THREAD_ARG(thread));
}
static void
mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec =
MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC;
- assert(!mgmt_fe_client_ctx.ibuf_fifo
- && !mgmt_fe_client_ctx.ibuf_work
- && !mgmt_fe_client_ctx.obuf_fifo
- && !mgmt_fe_client_ctx.obuf_work);
-
- mgmt_fe_client_ctx.ibuf_fifo = stream_fifo_new();
- mgmt_fe_client_ctx.ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- mgmt_fe_client_ctx.obuf_fifo = stream_fifo_new();
-
- mgmt_fe_client_ctx.obuf_work = NULL;
+ mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
+ MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+ "FE-client");
mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions);
mgmt_fe_server_disconnect(client_ctx, false);
- assert(mgmt_fe_client_ctx.ibuf_fifo && mgmt_fe_client_ctx.obuf_fifo);
-
mgmt_fe_destroy_client_sessions(lib_hndl);
THREAD_OFF(client_ctx->conn_retry_tmr);
THREAD_OFF(client_ctx->conn_write_ev);
THREAD_OFF(client_ctx->conn_writes_on);
THREAD_OFF(client_ctx->msg_proc_ev);
- stream_fifo_free(mgmt_fe_client_ctx.ibuf_fifo);
- if (mgmt_fe_client_ctx.ibuf_work)
- stream_free(mgmt_fe_client_ctx.ibuf_work);
- stream_fifo_free(mgmt_fe_client_ctx.obuf_fifo);
- if (mgmt_fe_client_ctx.obuf_work)
- stream_free(mgmt_fe_client_ctx.obuf_work);
+ mgmt_msg_destroy(&client_ctx->mstate);
}
extern "C" {
#endif
-#include "mgmtd/mgmt_defines.h"
#include "mgmt_pb.h"
+#include "thread.h"
+#include "mgmtd/mgmt_defines.h"
/***************************************************************
* Macros
#define MGMTD_DS_OPERATIONAL MGMTD__DATASTORE_ID__OPERATIONAL_DS
#define MGMTD_DS_MAX_ID MGMTD_DS_OPERATIONAL + 1
-struct mgmt_fe_msg_hdr {
- uint16_t marker;
- uint16_t len; /* Includes header */
-};
-#define MGMTD_FE_MSG_HDR_LEN sizeof(struct mgmt_fe_msg_hdr)
-#define MGMTD_FE_MSG_MARKER 0xdeaf
-
-struct mgmt_fe_msg {
- struct mgmt_fe_msg_hdr hdr;
- uint8_t payload[];
-};
-
/*
* All the client specific information this library needs to
* initialize itself, setup connection with MGMTD FrontEnd interface
#include "sockopt.h"
#include "network.h"
#include "libfrr.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "mgmtd/mgmt.h"
#include "mgmtd/mgmt_memory.h"
{
MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name);
UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF);
- if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo))
- mgmt_be_adapter_sched_msg_write(adapter);
+ mgmt_be_adapter_sched_msg_write(adapter);
}
static inline void
}
static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter,
- Mgmtd__BeMessage *be_msg)
+ Mgmtd__BeMessage *be_msg)
{
- size_t msg_size;
- uint8_t *msg_buf = adapter->msg_buf;
- struct mgmt_be_msg *msg;
-
- if (adapter->conn_fd < 0)
- return -1;
-
- msg_size = mgmtd__be_message__get_packed_size(be_msg);
- msg_size += MGMTD_BE_MSG_HDR_LEN;
- if (msg_size > MGMTD_BE_MSG_MAX_LEN) {
- MGMTD_BE_ADAPTER_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN);
+ if (adapter->conn_fd == -1) {
+ MGMTD_BE_ADAPTER_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_be_msg *)msg_buf;
- msg->hdr.marker = MGMTD_BE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__be_message__pack(be_msg, msg->payload);
-
- if (!adapter->obuf_work)
- adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- }
- stream_write(adapter->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &adapter->mstate, be_msg,
+ mgmtd__be_message__get_packed_size(be_msg),
+ (size_t(*)(void *, void *))mgmtd__be_message__pack,
+ mgmt_debug_be);
mgmt_be_adapter_sched_msg_write(adapter);
- adapter->num_msg_tx++;
- return 0;
+ return rv;
}
static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
return mgmt_be_adapter_send_msg(adapter, &be_msg);
}
-static uint16_t
-mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter,
- uint8_t *msg_buf, uint16_t bytes_read)
+static void mgmt_be_adapter_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_be_client_adapter *adapter = user_ctx;
Mgmtd__BeMessage *be_msg;
- struct mgmt_be_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_be_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) {
- MGMTD_BE_ADAPTER_DBG(
- "Marker not found in message from MGMTD Backend adapter '%s'",
- adapter->name);
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_BE_ADAPTER_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'",
- bytes_left, msg->hdr.len, adapter->name);
- break;
- }
- be_msg = mgmtd__be_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN),
- msg->payload);
- if (!be_msg) {
- MGMTD_BE_ADAPTER_DBG(
- "Failed to decode %d bytes from MGMTD Backend adapter '%s'",
- msg->hdr.len, adapter->name);
- continue;
- }
-
- (void)mgmt_be_adapter_handle_msg(adapter, be_msg);
- mgmtd__be_message__free_unpacked(be_msg, NULL);
- processed++;
- adapter->num_msg_rx++;
+ be_msg = mgmtd__be_message__unpack(NULL, len, data);
+ if (!be_msg) {
+ MGMTD_BE_ADAPTER_DBG(
+ "Failed to decode %zu bytes for adapter: %s", len,
+ adapter->name);
+ return;
}
-
- return processed;
+ MGMTD_BE_ADAPTER_DBG("Decoded %zu bytes of message: %u for adapter: %s",
+ len, be_msg->message_case, adapter->name);
+ (void)mgmt_be_adapter_handle_msg(adapter, be_msg);
+ mgmtd__be_message__free_unpacked(be_msg, NULL);
}
static void mgmt_be_adapter_proc_msgbufs(struct thread *thread)
{
- struct mgmt_be_client_adapter *adapter;
- struct stream *work;
- int processed = 0;
-
- adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
- assert(adapter);
-
- if (adapter->conn_fd < 0)
- return;
-
- for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(adapter->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_be_adapter_process_msg(
- adapter, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != adapter->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
+ struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread);
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(adapter->ibuf_fifo))
+ if (mgmt_msg_procbufs(&adapter->mstate, mgmt_be_adapter_process_msg,
+ adapter, mgmt_debug_be))
mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG);
}
static void mgmt_be_adapter_read(struct thread *thread)
{
struct mgmt_be_client_adapter *adapter;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_be_msg_hdr *msg_hdr;
- bool incomplete = false;
+ enum mgmt_msg_rsched rv;
adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd >= 0);
- total_bytes = 0;
- bytes_left = STREAM_SIZE(adapter->ibuf_work) -
- stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(adapter->ibuf_work,
- adapter->conn_fd, bytes_left);
- MGMTD_BE_ADAPTER_DBG(
- "Got %d bytes of message from MGMTD Backend adapter '%s'",
- bytes_read, adapter->name);
- if (bytes_read <= 0) {
- if (bytes_read == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_be_adapter_register_event(
- adapter, MGMTD_BE_CONN_READ);
- return;
- }
-
- if (!bytes_read) {
- /* Looks like connection closed */
- MGMTD_BE_ADAPTER_ERR(
- "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'",
- bytes_read, adapter->name,
- safe_strerror(errno));
- mgmt_be_adapter_disconnect(adapter);
- return;
- }
- break;
- }
-
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
-
- /*
- * Check if we would have read incomplete messages or not.
- */
- stream_set_getp(adapter->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- msg_hdr =
- (struct mgmt_be_msg_hdr *)(STREAM_DATA(
- adapter->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_BE_ADAPTER_ERR(
- "Received corrupted buffer from MGMTD Backend client.");
- mgmt_be_adapter_disconnect(adapter);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
-
- if (bytes_left > 0)
- incomplete = true;
-
- /*
- * We would have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work)
- + total_bytes);
- stream_set_endp(adapter->ibuf_work, total_bytes);
- stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work);
- adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(adapter->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(adapter->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_be);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_be_adapter_disconnect(adapter);
+ return;
}
-
- if (msg_cnt)
+ if (rv == MSR_SCHED_BOTH)
mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG);
-
mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ);
}
static void mgmt_be_adapter_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
- struct mgmt_be_client_adapter *adapter;
+ struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread);
+ enum mgmt_msg_wsched rv;
- adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd >= 0);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (adapter->obuf_work) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(adapter->obuf_fifo);
- s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(adapter->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, adapter->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_be_adapter_register_event(adapter,
- MGMTD_BE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_BE_ADAPTER_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_be_adapter_register_event(
- adapter, MGMTD_BE_CONN_WRITE);
- return;
- }
- mgmt_be_adapter_disconnect(adapter);
- return;
- }
-
- free = stream_fifo_pop(adapter->obuf_fifo);
- stream_free(free);
- MGMTD_BE_ADAPTER_DBG(
- "Wrote %d bytes of message to MGMTD Backend client socket.'",
- bytes_written);
- processed++;
- }
-
- if (s) {
+ rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_be);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_be_adapter_disconnect(adapter);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_be_adapter_writes_off(adapter);
mgmt_be_adapter_register_event(adapter,
- MGMTD_BE_CONN_WRITES_ON);
- }
+ MGMTD_BE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_be_adapter_resume_writes(struct thread *thread)
assert(adapter->conn_read_ev);
break;
case MGMTD_BE_CONN_WRITE:
+ if (adapter->conn_write_ev)
+ MGMTD_BE_ADAPTER_DBG(
+ "write ready notify already set for client %s",
+ adapter->name);
+ else
+ MGMTD_BE_ADAPTER_DBG(
+ "scheduling write ready notify for client %s",
+ adapter->name);
thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write,
adapter, adapter->conn_fd, &adapter->conn_write_ev);
assert(adapter->conn_write_ev);
(*adapter)->refcount--;
if (!(*adapter)->refcount) {
mgmt_be_adapters_del(&mgmt_be_adapters, *adapter);
-
- stream_fifo_free((*adapter)->ibuf_fifo);
- stream_free((*adapter)->ibuf_work);
- stream_fifo_free((*adapter)->obuf_fifo);
- stream_free((*adapter)->obuf_work);
-
THREAD_OFF((*adapter)->conn_init_ev);
THREAD_OFF((*adapter)->conn_read_ev);
THREAD_OFF((*adapter)->conn_write_ev);
THREAD_OFF((*adapter)->conn_writes_on);
THREAD_OFF((*adapter)->proc_msg_ev);
+ mgmt_msg_destroy(&(*adapter)->mstate);
XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter);
}
memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su));
snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d",
adapter->conn_fd);
- adapter->ibuf_fifo = stream_fifo_new();
- adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- adapter->obuf_fifo = stream_fifo_new();
- /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */
- adapter->obuf_work = NULL;
+ mgmt_msg_init(&adapter->mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
+ MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+ "BE-adapter");
mgmt_be_adapter_lock(adapter);
mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ);
vty_out(vty, " Conn-FD: \t\t\t%d\n", adapter->conn_fd);
vty_out(vty, " Client-Id: \t\t\t%d\n", adapter->id);
vty_out(vty, " Ref-Count: \t\t\t%u\n", adapter->refcount);
- vty_out(vty, " Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx);
- vty_out(vty, " Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx);
+ vty_out(vty, " Msg-Recvd: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.nrxm);
+ vty_out(vty, " Bytes-Recvd: \t\t%" PRIu64 "\n",
+ adapter->mstate.nrxb);
+ vty_out(vty, " Msg-Sent: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.ntxm);
+ vty_out(vty, " Bytes-Sent: \t\t%" PRIu64 "\n",
+ adapter->mstate.ntxb);
}
vty_out(vty, " Total: %d\n",
(int)mgmt_be_adapters_count(&mgmt_be_adapters));
#ifndef _FRR_MGMTD_BE_ADAPTER_H_
#define _FRR_MGMTD_BE_ADAPTER_H_
-#include "mgmtd/mgmt_defines.h"
#include "mgmt_be_client.h"
+#include "mgmt_msg.h"
+#include "mgmtd/mgmt_defines.h"
#include "mgmtd/mgmt_ds.h"
#define MGMTD_BE_CONN_INIT_DELAY_MSEC 50
char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN];
/* IO streams for read and write */
- /* pthread_mutex_t ibuf_mtx; */
- struct stream_fifo *ibuf_fifo;
- /* pthread_mutex_t obuf_mtx; */
- struct stream_fifo *obuf_fifo;
-
- /* Private I/O buffers */
- struct stream *ibuf_work;
- struct stream *obuf_work;
- uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN];
-
- /* Buffer of data waiting to be written to client. */
- /* struct buffer *wb; */
+ struct mgmt_msg_state mstate;
int refcount;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
/*
* List of config items that should be sent to the
zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__)
#endif /* REDIRECT_DEBUG_TO_STDERR */
-static int mgmt_be_listen_fd;
+static int mgmt_be_listen_fd = -1;
static struct thread_master *mgmt_be_listen_tm;
static struct thread *mgmt_be_listen_ev;
static void mgmt_be_server_register_event(enum mgmt_be_event event);
#include "network.h"
#include "libfrr.h"
#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "hash.h"
#include "jhash.h"
{
MGMTD_FE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name);
UNSET_FLAG(adapter->flags, MGMTD_FE_ADAPTER_FLAGS_WRITES_OFF);
- if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo))
- mgmt_fe_adapter_sched_msg_write(adapter);
+ mgmt_fe_adapter_sched_msg_write(adapter);
}
static inline void
mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter,
Mgmtd__FeMessage *fe_msg)
{
- size_t msg_size;
- uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN];
- struct mgmt_fe_msg *msg;
-
- if (adapter->conn_fd < 0) {
- MGMTD_FE_ADAPTER_ERR("Connection already reset");
- return -1;
- }
-
- msg_size = mgmtd__fe_message__get_packed_size(fe_msg);
- msg_size += MGMTD_FE_MSG_HDR_LEN;
- if (msg_size > sizeof(msg_buf)) {
- MGMTD_FE_ADAPTER_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)sizeof(msg_buf));
+ if (adapter->conn_fd == -1) {
+ MGMTD_FE_ADAPTER_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_fe_msg *)msg_buf;
- msg->hdr.marker = MGMTD_FE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__fe_message__pack(fe_msg, msg->payload);
-
- if (!adapter->obuf_work)
- adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- }
- stream_write(adapter->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &adapter->mstate, fe_msg,
+ mgmtd__fe_message__get_packed_size(fe_msg),
+ (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+ mgmt_debug_fe);
mgmt_fe_adapter_sched_msg_write(adapter);
- adapter->num_msg_tx++;
- return 0;
+ return rv;
}
static int
"Failed to create a Configuration session!");
return 0;
}
+ MGMTD_FE_ADAPTER_DBG(
+ "Created txn %llu for session %llu for COMMIT-CFG-REQ",
+ session->cfg_txn_id, session->session_id);
}
return 0;
}
-static uint16_t
-mgmt_fe_adapter_process_msg(struct mgmt_fe_client_adapter *adapter,
- uint8_t *msg_buf, uint16_t bytes_read)
+static void mgmt_fe_adapter_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_fe_client_adapter *adapter = user_ctx;
Mgmtd__FeMessage *fe_msg;
- struct mgmt_fe_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
-
- MGMTD_FE_ADAPTER_DBG(
- "Have %u bytes of messages from client '%s' to process",
- bytes_read, adapter->name);
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_fe_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) {
- MGMTD_FE_ADAPTER_DBG(
- "Marker not found in message from MGMTD Frontend adapter '%s'",
- adapter->name);
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_FE_ADAPTER_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend adapter '%s'",
- bytes_left, msg->hdr.len, adapter->name);
- break;
- }
-
- fe_msg = mgmtd__fe_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN),
- msg->payload);
- if (!fe_msg) {
- MGMTD_FE_ADAPTER_DBG(
- "Failed to decode %d bytes from MGMTD Frontend adapter '%s'",
- msg->hdr.len, adapter->name);
- continue;
- }
+ fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
+ if (!fe_msg) {
MGMTD_FE_ADAPTER_DBG(
- "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend adapter '%s'",
- msg->hdr.len, fe_msg->message_case,
- fe_msg->message_case, adapter->name);
-
- (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg);
-
- mgmtd__fe_message__free_unpacked(fe_msg, NULL);
- processed++;
- adapter->num_msg_rx++;
+ "Failed to decode %zu bytes for adapter: %s", len,
+ adapter->name);
+ return;
}
-
- return processed;
+ MGMTD_FE_ADAPTER_DBG(
+ "Decoded %zu bytes of message: %u from adapter: %s", len,
+ fe_msg->message_case, adapter->name);
+ (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg);
+ mgmtd__fe_message__free_unpacked(fe_msg, NULL);
}
static void mgmt_fe_adapter_proc_msgbufs(struct thread *thread)
{
- struct mgmt_fe_client_adapter *adapter;
- struct stream *work;
- int processed = 0;
+ struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
- adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd >= 0);
-
- MGMTD_FE_ADAPTER_DBG("Have %d ibufs for client '%s' to process",
- (int)stream_fifo_count_safe(adapter->ibuf_fifo),
- adapter->name);
-
- for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(adapter->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_fe_adapter_process_msg(
- adapter, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != adapter->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
-
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(adapter->ibuf_fifo))
+ if (mgmt_msg_procbufs(&adapter->mstate, mgmt_fe_adapter_process_msg,
+ adapter, mgmt_debug_fe))
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG);
}
static void mgmt_fe_adapter_read(struct thread *thread)
{
- struct mgmt_fe_client_adapter *adapter;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_fe_msg_hdr *msg_hdr;
- bool incomplete = false;
+ struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
+ enum mgmt_msg_rsched rv;
- adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd);
-
- total_bytes = 0;
- bytes_left = STREAM_SIZE(adapter->ibuf_work)
- - stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(adapter->ibuf_work, adapter->conn_fd,
- bytes_left);
- MGMTD_FE_ADAPTER_DBG(
- "Got %d bytes of message from MGMTD Frontend adapter '%s'",
- bytes_read, adapter->name);
- if (bytes_read <= 0) {
- if (bytes_read == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_fe_adapter_register_event(
- adapter, MGMTD_FE_CONN_READ);
- return;
- }
-
- if (!bytes_read) {
- /* Looks like connection closed */
- MGMTD_FE_ADAPTER_ERR(
- "Got error (%d) while reading from MGMTD Frontend adapter '%s'. Err: '%s'",
- bytes_read, adapter->name,
- safe_strerror(errno));
- mgmt_fe_adapter_disconnect(adapter);
- return;
- }
- break;
- }
-
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
-
- /*
- * Check if we would have read incomplete messages or not.
- */
- stream_set_getp(adapter->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- msg_hdr =
- (struct mgmt_fe_msg_hdr *)(STREAM_DATA(
- adapter->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_FE_ADAPTER_ERR(
- "Received corrupted buffer from MGMTD frontend client.");
- mgmt_fe_adapter_disconnect(adapter);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- MGMTD_FE_ADAPTER_DBG("Got message (len: %u) from client '%s'",
- msg_hdr->len, adapter->name);
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
-
- if (bytes_left > 0)
- incomplete = true;
- /*
- * We would have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr = (struct mgmt_fe_msg_hdr *)(STREAM_DATA(adapter->ibuf_work)
- + total_bytes);
- stream_set_endp(adapter->ibuf_work, total_bytes);
- stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work);
- adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(adapter->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(adapter->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_fe_adapter_disconnect(adapter);
+ return;
}
-
- if (msg_cnt)
+ if (rv == MSR_SCHED_BOTH)
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG);
-
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ);
}
static void mgmt_fe_adapter_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
- struct mgmt_fe_client_adapter *adapter;
-
- adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (adapter->obuf_work) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(adapter->obuf_fifo);
- s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(adapter->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, adapter->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_fe_adapter_register_event(
- adapter, MGMTD_FE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_FE_ADAPTER_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Frontend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_fe_adapter_register_event(
- adapter, MGMTD_FE_CONN_WRITE);
- return;
- }
- mgmt_fe_adapter_disconnect(adapter);
- return;
- }
-
- free = stream_fifo_pop(adapter->obuf_fifo);
- stream_free(free);
- MGMTD_FE_ADAPTER_DBG(
- "Wrote %d bytes of message to MGMTD Frontend client socket.'",
- bytes_written);
- processed++;
- }
+ struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
+ enum mgmt_msg_wsched rv;
- if (s) {
+ rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_fe_adapter_disconnect(adapter);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_fe_adapter_writes_off(adapter);
mgmt_fe_adapter_register_event(adapter,
- MGMTD_FE_CONN_WRITES_ON);
- }
+ MGMTD_FE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_fe_adapter_resume_writes(struct thread *thread)
struct mgmt_fe_client_adapter *adapter;
adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd);
+ assert(adapter && adapter->conn_fd != -1);
mgmt_fe_adapter_writes_on(adapter);
}
(*adapter)->refcount--;
if (!(*adapter)->refcount) {
mgmt_fe_adapters_del(&mgmt_fe_adapters, *adapter);
-
- stream_fifo_free((*adapter)->ibuf_fifo);
- stream_free((*adapter)->ibuf_work);
- stream_fifo_free((*adapter)->obuf_fifo);
- stream_free((*adapter)->obuf_work);
-
THREAD_OFF((*adapter)->conn_read_ev);
THREAD_OFF((*adapter)->conn_write_ev);
THREAD_OFF((*adapter)->proc_msg_ev);
THREAD_OFF((*adapter)->conn_writes_on);
+ mgmt_msg_destroy(&(*adapter)->mstate);
XFREE(MTYPE_MGMTD_FE_ADPATER, *adapter);
}
snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d",
adapter->conn_fd);
mgmt_fe_sessions_init(&adapter->fe_sessions);
- adapter->ibuf_fifo = stream_fifo_new();
- adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- adapter->obuf_fifo = stream_fifo_new();
- /* adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); */
- adapter->obuf_work = NULL;
+
+ mgmt_msg_init(&adapter->mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
+ MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+ "FE-adapter");
mgmt_fe_adapter_lock(adapter);
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ);
}
vty_out(vty, " Total-Sessions: \t\t\t%d\n",
(int)mgmt_fe_sessions_count(&adapter->fe_sessions));
- vty_out(vty, " Msg-Sent: \t\t\t\t%u\n", adapter->num_msg_tx);
- vty_out(vty, " Msg-Recvd: \t\t\t\t%u\n",
- adapter->num_msg_rx);
+ vty_out(vty, " Msg-Recvd: \t\t\t\t%" PRIu64 "\n",
+ adapter->mstate.nrxm);
+ vty_out(vty, " Bytes-Recvd: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.nrxb);
+ vty_out(vty, " Msg-Sent: \t\t\t\t%" PRIu64 "\n",
+ adapter->mstate.ntxm);
+ vty_out(vty, " Bytes-Sent: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.ntxb);
}
vty_out(vty, " Total: %d\n",
(int)mgmt_fe_adapters_count(&mgmt_fe_adapters));
#ifndef _FRR_MGMTD_FE_ADAPTER_H_
#define _FRR_MGMTD_FE_ADAPTER_H_
+#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
+#include "mgmtd/mgmt_defines.h"
+
struct mgmt_fe_client_adapter;
struct mgmt_master;
struct mgmt_fe_sessions_head fe_sessions;
/* IO streams for read and write */
- /* pthread_mutex_t ibuf_mtx; */
- struct stream_fifo *ibuf_fifo;
- /* pthread_mutex_t obuf_mtx; */
- struct stream_fifo *obuf_fifo;
-
- /* Private I/O buffers */
- struct stream *ibuf_work;
- struct stream *obuf_work;
+ struct mgmt_msg_state mstate;
int refcount;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
struct mgmt_commit_stats cmt_stats;
struct mgmt_setcfg_stats setcfg_stats;
zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__)
#endif /* REDIRECT_DEBUG_TO_STDERR */
-static int mgmt_fe_listen_fd;
+static int mgmt_fe_listen_fd = -1;
static struct thread_master *mgmt_fe_listen_tm;
static struct thread *mgmt_fe_listen_ev;
static void mgmt_fe_server_register_event(enum mgmt_fe_event event);
return -1;
}
+ MGMTD_TXN_DBG("Created initial txn %llu for BE connection %s",
+ txn->txn_id, adapter->name);
/*
* Set the changeset for transaction to commit and trigger the
* commit request.
return 0;
}
-int mgmt_txn_notify_be_cfg_validate_reply(
- uint64_t txn_id, bool success, uint64_t batch_ids[],
- size_t num_batch_ids, char *error_if_any,
- struct mgmt_be_client_adapter *adapter)
-{
- struct mgmt_txn_ctx *txn;
- struct mgmt_txn_be_cfg_batch *cfg_btch;
- struct mgmt_commit_cfg_req *cmtcfg_req = NULL;
- size_t indx;
-
- txn = mgmt_txn_id2ctx(txn_id);
- if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG)
- return -1;
-
- assert(txn->commit_cfg_req);
- cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg;
-
- if (!success) {
- MGMTD_TXN_ERR(
- "CFGDATA_VALIDATE_REQ sent to '%s' failed for Txn %p, Batches [0x%llx - 0x%llx], Err: %s",
- adapter->name, txn, (unsigned long long)batch_ids[0],
- (unsigned long long)batch_ids[num_batch_ids - 1],
- error_if_any ? error_if_any : "None");
- mgmt_txn_send_commit_cfg_reply(
- txn, MGMTD_INTERNAL_ERROR,
- "Internal error! Failed to validate config data on backend!");
- return 0;
- }
-
- for (indx = 0; indx < num_batch_ids; indx++) {
- cfg_btch = mgmt_txn_cfgbatch_id2ctx(txn, batch_ids[indx]);
- if (cfg_btch->txn != txn)
- return -1;
- mgmt_move_txn_cfg_batch_to_next(
- cmtcfg_req, cfg_btch,
- &cmtcfg_req->curr_batches[adapter->id],
- &cmtcfg_req->next_batches[adapter->id], true,
- MGMTD_COMMIT_PHASE_APPLY_CFG);
- }
-
- mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req);
-
- return 0;
-}
-
-extern int
-mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success,
+int mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success,
uint64_t batch_ids[],
size_t num_batch_ids, char *error_if_any,
struct mgmt_be_client_adapter *adapter)
return -1;
}
+ MGMTD_TXN_DBG("Created rollback txn %llu", txn->txn_id);
+
/*
* Set the changeset for transaction to commit and trigger the commit
* request.