diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/lib_vty.c | 3 | ||||
| -rw-r--r-- | lib/mgmt.proto | 345 | ||||
| -rw-r--r-- | lib/mgmt_fe_client.c | 1340 | ||||
| -rw-r--r-- | lib/mgmt_fe_client.h | 358 | ||||
| -rw-r--r-- | lib/mgmt_pb.h | 25 | ||||
| -rw-r--r-- | lib/northbound.h | 6 | ||||
| -rw-r--r-- | lib/northbound_cli.c | 4 | ||||
| -rw-r--r-- | lib/subdir.am | 21 | ||||
| -rw-r--r-- | lib/vty.c | 498 | ||||
| -rw-r--r-- | lib/vty.h | 24 |
10 files changed, 2620 insertions, 4 deletions
diff --git a/lib/lib_vty.c b/lib/lib_vty.c index 2c8f2e9047..91318e0d95 100644 --- a/lib/lib_vty.c +++ b/lib/lib_vty.c @@ -243,6 +243,9 @@ DEFUN_NOSH(end_config, end_config_cmd, "XFRR_end_configuration", zlog_info("Configuration Read in Took: %s", readin_time_str); + if (vty_mgmt_fe_enabled()) + vty_mgmt_send_commit_config(vty, false, false); + if (callback.end_config) (*callback.end_config)(); diff --git a/lib/mgmt.proto b/lib/mgmt.proto new file mode 100644 index 0000000000..4376794105 --- /dev/null +++ b/lib/mgmt.proto @@ -0,0 +1,345 @@ +// SPDX-License-Identifier: ISC +// +// mgmt.proto +// +// @copyright Copyright (C) 2021 Vmware, Inc. +// +// @author Pushpasis Sarkar <spushpasis@vmware.com> +// + +syntax = "proto2"; + +// +// Protobuf definitions pertaining to the MGMTD component. +// + +package mgmtd; + +// +// Common Sub-Messages +// + +message YangDataXPath { + required string xpath = 1; +} + +message YangDataValue { + oneof value { + // + // NOTE: For now let's use stringized value ONLY. + // We will enhance it later to pass native-format + // if needed. + // + // bool bool_val = 2; + // double double_val = 3; + // float float_val = 4; + // string string_val = 5; + // bytes bytes_val = 6; + // int32 int32_val = 7; + // int64 int64_val = 8; + // uint32 uint32_val = 9; + // uint64 uint64_val = 10; + // int32 int8_val = 11; + // uint32 uint8_val = 12; + // int32 int16_val = 13; + // uint32 uint16_val = 14; + string encoded_str_val = 100; + } +} + +message YangData { + required string xpath = 1; + optional YangDataValue value = 2; +} + +enum CfgDataReqType { + REQ_TYPE_NONE = 0; + SET_DATA = 1; + DELETE_DATA = 2; +} + +message YangCfgDataReq { + required YangData data = 1; + required CfgDataReqType req_type = 2; +} + +message YangGetDataReq { + required YangData data = 1; + required int64 next_indx = 2; +} + +// +// Backend Interface Messages +// +message BeSubscribeReq { + required string client_name = 1; + required bool subscribe_xpaths = 2; + repeated string xpath_reg = 3; +} + +message BeSubscribeReply { + required bool success = 1; +} + +message BeTxnReq { + required uint64 txn_id = 1; + required bool create = 2; +} + +message BeTxnReply { + required uint64 txn_id = 1; + required bool create = 2; + required bool success = 3; +} + +message BeCfgDataCreateReq { + required uint64 txn_id = 1; + required uint64 batch_id = 2; + repeated YangCfgDataReq data_req = 3; + required bool end_of_data = 4; +} + +message BeCfgDataCreateReply { + required uint64 txn_id = 1; + required uint64 batch_id = 2; + required bool success = 3; + optional string error_if_any = 4; +} + +message BeCfgDataValidateReq { + required uint64 txn_id = 1; + repeated uint64 batch_ids = 2; +} + +message BeCfgDataValidateReply { + required uint64 txn_id = 1; + repeated uint64 batch_ids = 2; + required bool success = 3; + optional string error_if_any = 4; +} + +message BeCfgDataApplyReq { + required uint64 txn_id = 1; +} + +message BeCfgDataApplyReply { + required uint64 txn_id = 1; + repeated uint64 batch_ids = 2; + required bool success = 3; + optional string error_if_any = 4; +} + +message BeOperDataGetReq { + required uint64 txn_id = 1; + required uint64 batch_id = 2; + repeated YangGetDataReq data = 3; +} + +message YangDataReply { + repeated YangData data = 1; + required int64 next_indx = 2; +} + +message BeOperDataGetReply { + required uint64 txn_id = 1; + required uint64 batch_id = 2; + required bool success = 3; + optional string error = 4; + optional YangDataReply data = 5; +} + +message BeOperDataNotify { + required YangDataReply data = 5; +} + +message BeConfigCmdReq { + required string cmd = 1; +} + +message BeConfigCmdReply { + required bool success = 1; + required string error_if_any = 2; +} + +message BeShowCmdReq { + required string cmd = 1; +} + +message BeShowCmdReply { + required bool success = 1; + required string cmd_ouput = 2; +} + +// +// Any message on the MGMTD Backend Interface. +// +message BeMessage { + oneof message { + BeSubscribeReq subscr_req = 2; + BeSubscribeReply subscr_reply = 3; + BeTxnReq txn_req = 4; + BeTxnReply txn_reply = 5; + BeCfgDataCreateReq cfg_data_req = 6; + BeCfgDataCreateReply cfg_data_reply = 7; + BeCfgDataValidateReq cfg_validate_req = 8; + BeCfgDataValidateReply cfg_validate_reply = 9; + BeCfgDataApplyReq cfg_apply_req = 10; + BeCfgDataApplyReply cfg_apply_reply = 11; + BeOperDataGetReq get_req = 12; + BeOperDataGetReply get_reply = 13; + BeOperDataNotify notify_data = 14; + BeConfigCmdReq cfg_cmd_req = 15; + BeConfigCmdReply cfg_cmd_reply = 16; + BeShowCmdReq show_cmd_req = 17; + BeShowCmdReply show_cmd_reply = 18; + } +} + + +// +// Frontend Interface Messages +// + +message FeRegisterReq { + required string client_name = 1; +} + +message FeSessionReq { + required bool create = 1; + oneof id { + uint64 client_conn_id = 2; // Applicable for create request only + uint64 session_id = 3; // Applicable for delete request only + } +} + +message FeSessionReply { + required bool create = 1; + required bool success = 2; + optional uint64 client_conn_id = 3; // Applicable for create request only + required uint64 session_id = 4; +} + +enum DatastoreId { + DS_NONE = 0; + RUNNING_DS = 1; + CANDIDATE_DS = 2; + OPERATIONAL_DS = 3; + STARTUP_DS = 4; +} + +message FeLockDsReq { + required uint64 session_id = 1; + required uint64 req_id = 2; + required DatastoreId ds_id = 3; + required bool lock = 4; +} + +message FeLockDsReply { + required uint64 session_id = 1; + required uint64 req_id = 2; + required DatastoreId ds_id = 3; + required bool lock = 4; + required bool success = 5; + optional string error_if_any = 6; +} + +message FeSetConfigReq { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required uint64 req_id = 3; + repeated YangCfgDataReq data = 4; + required bool implicit_commit = 5; + required DatastoreId commit_ds_id = 6; +} + +message FeSetConfigReply { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required uint64 req_id = 3; + required bool success = 4; + optional string error_if_any = 5; +} + +message FeCommitConfigReq { + required uint64 session_id = 1; + required DatastoreId src_ds_id = 2; + required DatastoreId dst_ds_id = 3; + required uint64 req_id = 4; + required bool validate_only = 5; + required bool abort = 6; +} + +message FeCommitConfigReply { + required uint64 session_id = 1; + required DatastoreId src_ds_id = 2; + required DatastoreId dst_ds_id = 3; + required uint64 req_id = 4; + required bool validate_only = 5; + required bool success = 6; + required bool abort = 7; + optional string error_if_any = 8; +} + +message FeGetConfigReq { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required uint64 req_id = 3; + repeated YangGetDataReq data = 4; +} + +message FeGetConfigReply { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required uint64 req_id = 3; + required bool success = 4; + optional string error_if_any = 5; + optional YangDataReply data = 6; +} + +message FeGetDataReq { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required uint64 req_id = 3; + repeated YangGetDataReq data = 4; +} + +message FeGetDataReply { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required uint64 req_id = 3; + required bool success = 4; + optional string error_if_any = 5; + optional YangDataReply data = 6; +} + +message FeNotifyDataReq { + repeated YangData data = 1; +} + +message FeRegisterNotifyReq { + required uint64 session_id = 1; + required DatastoreId ds_id = 2; + required bool register_req = 3; + required uint64 req_id = 4; + repeated YangDataXPath data_xpath = 5; +} + +message FeMessage { + oneof message { + FeRegisterReq register_req = 2; + FeSessionReq session_req = 3; + FeSessionReply session_reply = 4; + FeLockDsReq lockds_req = 5; + FeLockDsReply lockds_reply = 6; + FeSetConfigReq setcfg_req = 7; + FeSetConfigReply setcfg_reply = 8; + FeCommitConfigReq commcfg_req = 9; + FeCommitConfigReply commcfg_reply = 10; + FeGetConfigReq getcfg_req = 11; + FeGetConfigReply getcfg_reply = 12; + FeGetDataReq getdata_req = 13; + FeGetDataReply getdata_reply = 14; + FeNotifyDataReq notify_data_req = 15; + FeRegisterNotifyReq regnotify_req = 16; + } +} diff --git a/lib/mgmt_fe_client.c b/lib/mgmt_fe_client.c new file mode 100644 index 0000000000..22c3a06b9e --- /dev/null +++ b/lib/mgmt_fe_client.c @@ -0,0 +1,1340 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Frontend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "memory.h" +#include "libfrr.h" +#include "mgmt_fe_client.h" +#include "mgmt_pb.h" +#include "network.h" +#include "stream.h" +#include "sockopt.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_FE_CLIENT_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_FE_CLIENT_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_FE_CLIENT_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_fe_client) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_FE_CLIENT_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +struct mgmt_fe_client_ctx; + +PREDECL_LIST(mgmt_sessions); + +struct mgmt_fe_client_session { + uint64_t client_id; + uint64_t session_id; + struct mgmt_fe_client_ctx *client_ctx; + uintptr_t user_ctx; + + struct mgmt_sessions_item list_linkage; +}; + +DECLARE_LIST(mgmt_sessions, struct mgmt_fe_client_session, list_linkage); + +DEFINE_MTYPE_STATIC(LIB, MGMTD_FE_SESSION, "MGMTD Frontend session"); + +struct mgmt_fe_client_ctx { + int conn_fd; + struct thread_master *tm; + struct thread *conn_retry_tmr; + struct thread *conn_read_ev; + struct thread *conn_write_ev; + struct thread *conn_writes_on; + struct thread *msg_proc_ev; + uint32_t flags; + uint32_t num_msg_tx; + uint32_t num_msg_rx; + + struct stream_fifo *ibuf_fifo; + struct stream *ibuf_work; + struct stream_fifo *obuf_fifo; + struct stream *obuf_work; + + struct mgmt_fe_client_params client_params; + + struct mgmt_sessions_head client_sessions; +}; + +#define MGMTD_FE_CLIENT_FLAGS_WRITES_OFF (1U << 0) + +#define FOREACH_SESSION_IN_LIST(client_ctx, session) \ + frr_each_safe (mgmt_sessions, &(client_ctx)->client_sessions, (session)) + +static bool mgmt_debug_fe_client; + +static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {0}; + +/* Forward declarations */ +static void +mgmt_fe_client_register_event(struct mgmt_fe_client_ctx *client_ctx, + enum mgmt_fe_event event); +static void mgmt_fe_client_schedule_conn_retry( + struct mgmt_fe_client_ctx *client_ctx, unsigned long intvl_secs); + +static struct mgmt_fe_client_session * +mgmt_fe_find_session_by_client_id(struct mgmt_fe_client_ctx *client_ctx, + uint64_t client_id) +{ + struct mgmt_fe_client_session *session; + + FOREACH_SESSION_IN_LIST (client_ctx, session) { + if (session->client_id == client_id) { + MGMTD_FE_CLIENT_DBG( + "Found session %p for client-id %llu.", session, + (unsigned long long)client_id); + return session; + } + } + + return NULL; +} + +static struct mgmt_fe_client_session * +mgmt_fe_find_session_by_session_id(struct mgmt_fe_client_ctx *client_ctx, + uint64_t session_id) +{ + struct mgmt_fe_client_session *session; + + FOREACH_SESSION_IN_LIST (client_ctx, session) { + if (session->session_id == session_id) { + MGMTD_FE_CLIENT_DBG( + "Found session %p for session-id %llu.", session, + (unsigned long long)session_id); + return session; + } + } + + return NULL; +} + +static void +mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx, + bool reconnect) +{ + if (client_ctx->conn_fd) { + close(client_ctx->conn_fd); + client_ctx->conn_fd = 0; + } + + if (reconnect) + mgmt_fe_client_schedule_conn_retry( + client_ctx, + client_ctx->client_params.conn_retry_intvl_sec); +} + +static inline void +mgmt_fe_client_sched_msg_write(struct mgmt_fe_client_ctx *client_ctx) +{ + if (!CHECK_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF)) + mgmt_fe_client_register_event(client_ctx, + MGMTD_FE_CONN_WRITE); +} + +static inline void +mgmt_fe_client_writes_on(struct mgmt_fe_client_ctx *client_ctx) +{ + MGMTD_FE_CLIENT_DBG("Resume writing msgs"); + UNSET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF); + if (client_ctx->obuf_work + || stream_fifo_count_safe(client_ctx->obuf_fifo)) + mgmt_fe_client_sched_msg_write(client_ctx); +} + +static inline void +mgmt_fe_client_writes_off(struct mgmt_fe_client_ctx *client_ctx) +{ + SET_FLAG(client_ctx->flags, MGMTD_FE_CLIENT_FLAGS_WRITES_OFF); + MGMTD_FE_CLIENT_DBG("Paused writing msgs"); +} + +static int +mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, + Mgmtd__FeMessage *fe_msg) +{ + size_t msg_size; + uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN]; + struct mgmt_fe_msg *msg; + + if (client_ctx->conn_fd == 0) + return -1; + + msg_size = mgmtd__fe_message__get_packed_size(fe_msg); + msg_size += MGMTD_FE_MSG_HDR_LEN; + if (msg_size > sizeof(msg_buf)) { + MGMTD_FE_CLIENT_ERR( + "Message size %d more than max size'%d. Not sending!'", + (int)msg_size, (int)sizeof(msg_buf)); + return -1; + } + + msg = (struct mgmt_fe_msg *)msg_buf; + msg->hdr.marker = MGMTD_FE_MSG_MARKER; + msg->hdr.len = (uint16_t)msg_size; + mgmtd__fe_message__pack(fe_msg, msg->payload); + + if (!client_ctx->obuf_work) + client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); + if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); + } + stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); + + mgmt_fe_client_sched_msg_write(client_ctx); + client_ctx->num_msg_tx++; + return 0; +} + +static void mgmt_fe_client_write(struct thread *thread) +{ + int bytes_written = 0; + int processed = 0; + int msg_size = 0; + struct stream *s = NULL; + struct stream *free = NULL; + struct mgmt_fe_client_ctx *client_ctx; + + client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + /* Ensure pushing any pending write buffer to FIFO */ + if (client_ctx->obuf_work) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = NULL; + } + + for (s = stream_fifo_head(client_ctx->obuf_fifo); + s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE; + s = stream_fifo_head(client_ctx->obuf_fifo)) { + /* msg_size = (int)stream_get_size(s); */ + msg_size = (int)STREAM_READABLE(s); + bytes_written = stream_flush(s, client_ctx->conn_fd); + if (bytes_written == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_fe_client_register_event( + client_ctx, MGMTD_FE_CONN_WRITE); + return; + } else if (bytes_written != msg_size) { + MGMTD_FE_CLIENT_ERR( + "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", + msg_size, bytes_written, safe_strerror(errno)); + if (bytes_written > 0) { + stream_forward_getp(s, (size_t)bytes_written); + stream_pulldown(s); + mgmt_fe_client_register_event( + client_ctx, MGMTD_FE_CONN_WRITE); + return; + } + mgmt_fe_server_disconnect(client_ctx, true); + return; + } + + free = stream_fifo_pop(client_ctx->obuf_fifo); + stream_free(free); + MGMTD_FE_CLIENT_DBG( + "Wrote %d bytes of message to MGMTD Backend client socket.'", + bytes_written); + processed++; + } + + if (s) { + mgmt_fe_client_writes_off(client_ctx); + mgmt_fe_client_register_event(client_ctx, + MGMTD_FE_CONN_WRITES_ON); + } +} + +static void mgmt_fe_client_resume_writes(struct thread *thread) +{ + struct mgmt_fe_client_ctx *client_ctx; + + client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + mgmt_fe_client_writes_on(client_ctx); +} + +static int +mgmt_fe_send_register_req(struct mgmt_fe_client_ctx *client_ctx) +{ + Mgmtd__FeMessage fe_msg; + Mgmtd__FeRegisterReq rgstr_req; + + mgmtd__fe_register_req__init(&rgstr_req); + rgstr_req.client_name = client_ctx->client_params.name; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_REGISTER_REQ; + fe_msg.register_req = &rgstr_req; + + MGMTD_FE_CLIENT_DBG( + "Sending REGISTER_REQ message to MGMTD Frontend server"); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, + bool create) +{ + Mgmtd__FeMessage fe_msg; + Mgmtd__FeSessionReq sess_req; + + mgmtd__fe_session_req__init(&sess_req); + sess_req.create = create; + if (create) { + sess_req.id_case = MGMTD__FE_SESSION_REQ__ID_CLIENT_CONN_ID; + sess_req.client_conn_id = session->client_id; + } else { + sess_req.id_case = MGMTD__FE_SESSION_REQ__ID_SESSION_ID; + sess_req.session_id = session->session_id; + } + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_SESSION_REQ; + fe_msg.session_req = &sess_req; + + MGMTD_FE_CLIENT_DBG( + "Sending SESSION_REQ message for %s session %llu to MGMTD Frontend server", + create ? "creating" : "destroying", + (unsigned long long)session->client_id); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_send_lockds_req(struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, bool lock, + uint64_t req_id, Mgmtd__DatastoreId ds_id) +{ + (void)req_id; + Mgmtd__FeMessage fe_msg; + Mgmtd__FeLockDsReq lockds_req; + + mgmtd__fe_lock_ds_req__init(&lockds_req); + lockds_req.session_id = session->session_id; + lockds_req.req_id = req_id; + lockds_req.ds_id = ds_id; + lockds_req.lock = lock; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_LOCKDS_REQ; + fe_msg.lockds_req = &lockds_req; + + MGMTD_FE_CLIENT_DBG( + "Sending %sLOCK_REQ message for Ds:%d session %llu to MGMTD Frontend server", + lock ? "" : "UN", ds_id, (unsigned long long)session->client_id); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_send_setcfg_req(struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangCfgDataReq **data_req, int num_data_reqs, + bool implicit_commit, Mgmtd__DatastoreId dst_ds_id) +{ + (void)req_id; + Mgmtd__FeMessage fe_msg; + Mgmtd__FeSetConfigReq setcfg_req; + + mgmtd__fe_set_config_req__init(&setcfg_req); + setcfg_req.session_id = session->session_id; + setcfg_req.ds_id = ds_id; + setcfg_req.req_id = req_id; + setcfg_req.data = data_req; + setcfg_req.n_data = (size_t)num_data_reqs; + setcfg_req.implicit_commit = implicit_commit; + setcfg_req.commit_ds_id = dst_ds_id; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_SETCFG_REQ; + fe_msg.setcfg_req = &setcfg_req; + + MGMTD_FE_CLIENT_DBG( + "Sending SET_CONFIG_REQ message for Ds:%d session %llu (#xpaths:%d) to MGMTD Frontend server", + ds_id, (unsigned long long)session->client_id, num_data_reqs); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_send_commitcfg_req(struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, + uint64_t req_id, Mgmtd__DatastoreId src_ds_id, + Mgmtd__DatastoreId dest_ds_id, bool validate_only, + bool abort) +{ + (void)req_id; + Mgmtd__FeMessage fe_msg; + Mgmtd__FeCommitConfigReq commitcfg_req; + + mgmtd__fe_commit_config_req__init(&commitcfg_req); + commitcfg_req.session_id = session->session_id; + commitcfg_req.src_ds_id = src_ds_id; + commitcfg_req.dst_ds_id = dest_ds_id; + commitcfg_req.req_id = req_id; + commitcfg_req.validate_only = validate_only; + commitcfg_req.abort = abort; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_COMMCFG_REQ; + fe_msg.commcfg_req = &commitcfg_req; + + MGMTD_FE_CLIENT_DBG( + "Sending COMMIT_CONFIG_REQ message for Src-Ds:%d, Dst-Ds:%d session %llu to MGMTD Frontend server", + src_ds_id, dest_ds_id, (unsigned long long)session->client_id); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_send_getcfg_req(struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangGetDataReq * data_req[], + int num_data_reqs) +{ + (void)req_id; + Mgmtd__FeMessage fe_msg; + Mgmtd__FeGetConfigReq getcfg_req; + + mgmtd__fe_get_config_req__init(&getcfg_req); + getcfg_req.session_id = session->session_id; + getcfg_req.ds_id = ds_id; + getcfg_req.req_id = req_id; + getcfg_req.data = data_req; + getcfg_req.n_data = (size_t)num_data_reqs; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_GETCFG_REQ; + fe_msg.getcfg_req = &getcfg_req; + + MGMTD_FE_CLIENT_DBG( + "Sending GET_CONFIG_REQ message for Ds:%d session %llu (#xpaths:%d) to MGMTD Frontend server", + ds_id, (unsigned long long)session->client_id, num_data_reqs); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_send_getdata_req(struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangGetDataReq * data_req[], + int num_data_reqs) +{ + (void)req_id; + Mgmtd__FeMessage fe_msg; + Mgmtd__FeGetDataReq getdata_req; + + mgmtd__fe_get_data_req__init(&getdata_req); + getdata_req.session_id = session->session_id; + getdata_req.ds_id = ds_id; + getdata_req.req_id = req_id; + getdata_req.data = data_req; + getdata_req.n_data = (size_t)num_data_reqs; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_GETDATA_REQ; + fe_msg.getdata_req = &getdata_req; + + MGMTD_FE_CLIENT_DBG( + "Sending GET_CONFIG_REQ message for Ds:%d session %llu (#xpaths:%d) to MGMTD Frontend server", + ds_id, (unsigned long long)session->client_id, num_data_reqs); + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int mgmt_fe_send_regnotify_req( + struct mgmt_fe_client_ctx *client_ctx, + struct mgmt_fe_client_session *session, uint64_t req_id, + Mgmtd__DatastoreId ds_id, bool register_req, + Mgmtd__YangDataXPath * data_req[], int num_data_reqs) +{ + (void)req_id; + Mgmtd__FeMessage fe_msg; + Mgmtd__FeRegisterNotifyReq regntfy_req; + + mgmtd__fe_register_notify_req__init(®ntfy_req); + regntfy_req.session_id = session->session_id; + regntfy_req.ds_id = ds_id; + regntfy_req.register_req = register_req; + regntfy_req.data_xpath = data_req; + regntfy_req.n_data_xpath = (size_t)num_data_reqs; + + mgmtd__fe_message__init(&fe_msg); + fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_REGNOTIFY_REQ; + fe_msg.regnotify_req = ®ntfy_req; + + return mgmt_fe_client_send_msg(client_ctx, &fe_msg); +} + +static int +mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx, + Mgmtd__FeMessage *fe_msg) +{ + struct mgmt_fe_client_session *session = NULL; + + switch (fe_msg->message_case) { + case MGMTD__FE_MESSAGE__MESSAGE_SESSION_REPLY: + if (fe_msg->session_reply->create + && fe_msg->session_reply->has_client_conn_id) { + MGMTD_FE_CLIENT_DBG( + "Got Session Create Reply Msg for client-id %llu with session-id: %llu.", + (unsigned long long) + fe_msg->session_reply->client_conn_id, + (unsigned long long) + fe_msg->session_reply->session_id); + + session = mgmt_fe_find_session_by_client_id( + client_ctx, + fe_msg->session_reply->client_conn_id); + + if (session && fe_msg->session_reply->success) { + MGMTD_FE_CLIENT_DBG( + "Session Create for client-id %llu successful.", + (unsigned long long)fe_msg + ->session_reply->client_conn_id); + session->session_id = + fe_msg->session_reply->session_id; + } else { + MGMTD_FE_CLIENT_ERR( + "Session Create for client-id %llu failed.", + (unsigned long long)fe_msg + ->session_reply->client_conn_id); + } + } else if (!fe_msg->session_reply->create) { + MGMTD_FE_CLIENT_DBG( + "Got Session Destroy Reply Msg for session-id %llu", + (unsigned long long) + fe_msg->session_reply->session_id); + + session = mgmt_fe_find_session_by_session_id( + client_ctx, fe_msg->session_req->session_id); + } + + if (session && session->client_ctx + && session->client_ctx->client_params + .client_session_notify) + (*session->client_ctx->client_params + .client_session_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + session->client_id, + fe_msg->session_reply->create, + fe_msg->session_reply->success, + (uintptr_t)session, session->user_ctx); + break; + case MGMTD__FE_MESSAGE__MESSAGE_LOCKDS_REPLY: + MGMTD_FE_CLIENT_DBG( + "Got LockDs Reply Msg for session-id %llu", + (unsigned long long) + fe_msg->lockds_reply->session_id); + session = mgmt_fe_find_session_by_session_id( + client_ctx, fe_msg->lockds_reply->session_id); + + if (session && session->client_ctx + && session->client_ctx->client_params + .lock_ds_notify) + (*session->client_ctx->client_params + .lock_ds_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + session->client_id, (uintptr_t)session, + session->user_ctx, + fe_msg->lockds_reply->req_id, + fe_msg->lockds_reply->lock, + fe_msg->lockds_reply->success, + fe_msg->lockds_reply->ds_id, + fe_msg->lockds_reply->error_if_any); + break; + case MGMTD__FE_MESSAGE__MESSAGE_SETCFG_REPLY: + MGMTD_FE_CLIENT_DBG( + "Got Set Config Reply Msg for session-id %llu", + (unsigned long long) + fe_msg->setcfg_reply->session_id); + + session = mgmt_fe_find_session_by_session_id( + client_ctx, fe_msg->setcfg_reply->session_id); + + if (session && session->client_ctx + && session->client_ctx->client_params + .set_config_notify) + (*session->client_ctx->client_params + .set_config_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + session->client_id, (uintptr_t)session, + session->user_ctx, + fe_msg->setcfg_reply->req_id, + fe_msg->setcfg_reply->success, + fe_msg->setcfg_reply->ds_id, + fe_msg->setcfg_reply->error_if_any); + break; + case MGMTD__FE_MESSAGE__MESSAGE_COMMCFG_REPLY: + MGMTD_FE_CLIENT_DBG( + "Got Commit Config Reply Msg for session-id %llu", + (unsigned long long) + fe_msg->commcfg_reply->session_id); + + session = mgmt_fe_find_session_by_session_id( + client_ctx, fe_msg->commcfg_reply->session_id); + + if (session && session->client_ctx + && session->client_ctx->client_params + .commit_config_notify) + (*session->client_ctx->client_params + .commit_config_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + session->client_id, (uintptr_t)session, + session->user_ctx, + fe_msg->commcfg_reply->req_id, + fe_msg->commcfg_reply->success, + fe_msg->commcfg_reply->src_ds_id, + fe_msg->commcfg_reply->dst_ds_id, + fe_msg->commcfg_reply->validate_only, + fe_msg->commcfg_reply->error_if_any); + break; + case MGMTD__FE_MESSAGE__MESSAGE_GETCFG_REPLY: + MGMTD_FE_CLIENT_DBG( + "Got Get Config Reply Msg for session-id %llu", + (unsigned long long) + fe_msg->getcfg_reply->session_id); + + session = mgmt_fe_find_session_by_session_id( + client_ctx, fe_msg->getcfg_reply->session_id); + + if (session && session->client_ctx + && session->client_ctx->client_params + .get_data_notify) + (*session->client_ctx->client_params + .get_data_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + session->client_id, (uintptr_t)session, + session->user_ctx, + fe_msg->getcfg_reply->req_id, + fe_msg->getcfg_reply->success, + fe_msg->getcfg_reply->ds_id, + fe_msg->getcfg_reply->data + ? fe_msg->getcfg_reply->data->data + : NULL, + fe_msg->getcfg_reply->data + ? fe_msg->getcfg_reply->data->n_data + : 0, + fe_msg->getcfg_reply->data + ? fe_msg->getcfg_reply->data + ->next_indx + : 0, + fe_msg->getcfg_reply->error_if_any); + break; + case MGMTD__FE_MESSAGE__MESSAGE_GETDATA_REPLY: + MGMTD_FE_CLIENT_DBG( + "Got Get Data Reply Msg for session-id %llu", + (unsigned long long) + fe_msg->getdata_reply->session_id); + + session = mgmt_fe_find_session_by_session_id( + client_ctx, fe_msg->getdata_reply->session_id); + + if (session && session->client_ctx + && session->client_ctx->client_params + .get_data_notify) + (*session->client_ctx->client_params + .get_data_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + session->client_id, (uintptr_t)session, + session->user_ctx, + fe_msg->getdata_reply->req_id, + fe_msg->getdata_reply->success, + fe_msg->getdata_reply->ds_id, + fe_msg->getdata_reply->data + ? fe_msg->getdata_reply->data->data + : NULL, + fe_msg->getdata_reply->data + ? fe_msg->getdata_reply->data + ->n_data + : 0, + fe_msg->getdata_reply->data + ? fe_msg->getdata_reply->data + ->next_indx + : 0, + fe_msg->getdata_reply->error_if_any); + break; + case MGMTD__FE_MESSAGE__MESSAGE_NOTIFY_DATA_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_REGNOTIFY_REQ: + /* + * TODO: Add handling code in future. + */ + break; + /* + * NOTE: The following messages are always sent from Frontend + * clients to MGMTd only and/or need not be handled here. + */ + case MGMTD__FE_MESSAGE__MESSAGE_REGISTER_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_SESSION_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_LOCKDS_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_SETCFG_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_COMMCFG_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_GETCFG_REQ: + case MGMTD__FE_MESSAGE__MESSAGE_GETDATA_REQ: + case MGMTD__FE_MESSAGE__MESSAGE__NOT_SET: +#if PROTOBUF_C_VERSION_NUMBER >= 1003000 + case _MGMTD__FE_MESSAGE__MESSAGE_IS_INT_SIZE: +#endif + default: + /* + * A 'default' case is being added contrary to the + * FRR code guidelines to take care of build + * failures on certain build systems (courtesy of + * the proto-c package). + */ + break; + } + + return 0; +} + +static int +mgmt_fe_client_process_msg(struct mgmt_fe_client_ctx *client_ctx, + uint8_t *msg_buf, int bytes_read) +{ + Mgmtd__FeMessage *fe_msg; + struct mgmt_fe_msg *msg; + uint16_t bytes_left; + uint16_t processed = 0; + + MGMTD_FE_CLIENT_DBG( + "Have %u bytes of messages from MGMTD Frontend server to .", + bytes_read); + + bytes_left = bytes_read; + for (; bytes_left > MGMTD_FE_MSG_HDR_LEN; + bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { + msg = (struct mgmt_fe_msg *)msg_buf; + if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) { + MGMTD_FE_CLIENT_DBG( + "Marker not found in message from MGMTD Frontend server."); + break; + } + + if (bytes_left < msg->hdr.len) { + MGMTD_FE_CLIENT_DBG( + "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend server.", + bytes_left, msg->hdr.len); + break; + } + + fe_msg = mgmtd__fe_message__unpack( + NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN), + msg->payload); + if (!fe_msg) { + MGMTD_FE_CLIENT_DBG( + "Failed to decode %d bytes from MGMTD Frontend server.", + msg->hdr.len); + continue; + } + + MGMTD_FE_CLIENT_DBG( + "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend server", + msg->hdr.len, fe_msg->message_case, + fe_msg->message_case); + + (void)mgmt_fe_client_handle_msg(client_ctx, fe_msg); + + mgmtd__fe_message__free_unpacked(fe_msg, NULL); + processed++; + client_ctx->num_msg_rx++; + } + + return processed; +} + +static void mgmt_fe_client_proc_msgbufs(struct thread *thread) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct stream *work; + int processed = 0; + + client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) { + work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); + if (!work) + break; + + processed += mgmt_fe_client_process_msg( + client_ctx, STREAM_DATA(work), stream_get_endp(work)); + + if (work != client_ctx->ibuf_work) { + /* Free it up */ + stream_free(work); + } else { + /* Reset stream buffer for next read */ + stream_reset(work); + } + } + + /* + * If we have more to process, reschedule for processing it. + */ + if (stream_fifo_head(client_ctx->ibuf_fifo)) + mgmt_fe_client_register_event(client_ctx, + MGMTD_FE_PROC_MSG); +} + +static void mgmt_fe_client_read(struct thread *thread) +{ + struct mgmt_fe_client_ctx *client_ctx; + int bytes_read, msg_cnt; + size_t total_bytes, bytes_left; + struct mgmt_fe_msg_hdr *msg_hdr; + bool incomplete = false; + + client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + total_bytes = 0; + bytes_left = STREAM_SIZE(client_ctx->ibuf_work) + - stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { + bytes_read = stream_read_try(client_ctx->ibuf_work, + client_ctx->conn_fd, bytes_left); + MGMTD_FE_CLIENT_DBG( + "Got %d bytes of message from MGMTD Frontend server", + bytes_read); + /* -2 is normal nothing read, and to retry */ + if (bytes_read == -2) + break; + if (bytes_read <= 0) { + if (bytes_read == 0) { + MGMTD_FE_CLIENT_ERR( + "Got EOF/disconnect while reading from MGMTD Frontend server."); + } else { + /* Fatal error */ + MGMTD_FE_CLIENT_ERR( + "Got error (%d) while reading from MGMTD Frontend server. Err: '%s'", + bytes_read, safe_strerror(errno)); + } + mgmt_fe_server_disconnect(client_ctx, true); + return; + } + total_bytes += bytes_read; + bytes_left -= bytes_read; + } + + /* + * Check if we have read complete messages or not. + */ + stream_set_getp(client_ctx->ibuf_work, 0); + total_bytes = 0; + msg_cnt = 0; + bytes_left = stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { + msg_hdr = (struct mgmt_fe_msg_hdr + *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) { + /* Corrupted buffer. Force disconnect?? */ + MGMTD_FE_CLIENT_ERR( + "Received corrupted buffer from MGMTD frontend server."); + mgmt_fe_server_disconnect(client_ctx, true); + return; + } + if (msg_hdr->len > bytes_left) + break; + + total_bytes += msg_hdr->len; + bytes_left -= msg_hdr->len; + msg_cnt++; + } + + if (!msg_cnt) + goto resched; + + if (bytes_left > 0) + incomplete = true; + + /* + * We have read one or several messages. + * Schedule processing them now. + */ + msg_hdr = + (struct mgmt_fe_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + stream_set_endp(client_ctx->ibuf_work, total_bytes); + stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); + client_ctx->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); + if (incomplete) { + stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); + stream_set_endp(client_ctx->ibuf_work, bytes_left); + } + + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); + +resched: + mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); +} + +static int mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx) +{ + int ret, sock, len; + struct sockaddr_un addr; + + MGMTD_FE_CLIENT_DBG( + "Trying to connect to MGMTD Frontend server at %s", + MGMTD_FE_SERVER_PATH); + + assert(!client_ctx->conn_fd); + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + MGMTD_FE_CLIENT_ERR("Failed to create socket"); + goto mgmt_fe_server_connect_failed; + } + + MGMTD_FE_CLIENT_DBG( + "Created MGMTD Frontend server socket successfully!"); + + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + strlcpy(addr.sun_path, MGMTD_FE_SERVER_PATH, sizeof(addr.sun_path)); +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + len = addr.sun_len = SUN_LEN(&addr); +#else + len = sizeof(addr.sun_family) + strlen(addr.sun_path); +#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ + + ret = connect(sock, (struct sockaddr *)&addr, len); + if (ret < 0) { + MGMTD_FE_CLIENT_ERR( + "Failed to connect to MGMTD Frontend Server at %s. Err: %s", + addr.sun_path, safe_strerror(errno)); + close(sock); + goto mgmt_fe_server_connect_failed; + } + + MGMTD_FE_CLIENT_DBG( + "Connected to MGMTD Frontend Server at %s successfully!", + addr.sun_path); + client_ctx->conn_fd = sock; + + /* Make client socket non-blocking. */ + set_nonblocking(sock); + setsockopt_so_sendbuf(client_ctx->conn_fd, + MGMTD_SOCKET_FE_SEND_BUF_SIZE); + setsockopt_so_recvbuf(client_ctx->conn_fd, + MGMTD_SOCKET_FE_RECV_BUF_SIZE); + + thread_add_read(client_ctx->tm, mgmt_fe_client_read, + (void *)&mgmt_fe_client_ctx, client_ctx->conn_fd, + &client_ctx->conn_read_ev); + assert(client_ctx->conn_read_ev); + + /* Send REGISTER_REQ message */ + if (mgmt_fe_send_register_req(client_ctx) != 0) + goto mgmt_fe_server_connect_failed; + + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params.client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, true); + + return 0; + +mgmt_fe_server_connect_failed: + if (sock && sock != client_ctx->conn_fd) + close(sock); + + mgmt_fe_server_disconnect(client_ctx, true); + return -1; +} + +static void mgmt_fe_client_conn_timeout(struct thread *thread) +{ + struct mgmt_fe_client_ctx *client_ctx; + + client_ctx = (struct mgmt_fe_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + mgmt_fe_server_connect(client_ctx); +} + +static void +mgmt_fe_client_register_event(struct mgmt_fe_client_ctx *client_ctx, + enum mgmt_fe_event event) +{ + struct timeval tv = {0}; + + switch (event) { + case MGMTD_FE_CONN_READ: + thread_add_read(client_ctx->tm, mgmt_fe_client_read, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_read_ev); + assert(client_ctx->conn_read_ev); + break; + case MGMTD_FE_CONN_WRITE: + thread_add_write(client_ctx->tm, mgmt_fe_client_write, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_write_ev); + assert(client_ctx->conn_write_ev); + break; + case MGMTD_FE_PROC_MSG: + tv.tv_usec = MGMTD_FE_MSG_PROC_DELAY_USEC; + thread_add_timer_tv(client_ctx->tm, + mgmt_fe_client_proc_msgbufs, client_ctx, + &tv, &client_ctx->msg_proc_ev); + assert(client_ctx->msg_proc_ev); + break; + case MGMTD_FE_CONN_WRITES_ON: + thread_add_timer_msec( + client_ctx->tm, mgmt_fe_client_resume_writes, + client_ctx, MGMTD_FE_MSG_WRITE_DELAY_MSEC, + &client_ctx->conn_writes_on); + assert(client_ctx->conn_writes_on); + break; + case MGMTD_FE_SERVER: + assert(!"mgmt_fe_client_ctx_post_event called incorrectly"); + break; + } +} + +static void mgmt_fe_client_schedule_conn_retry( + struct mgmt_fe_client_ctx *client_ctx, unsigned long intvl_secs) +{ + MGMTD_FE_CLIENT_DBG( + "Scheduling MGMTD Frontend server connection retry after %lu seconds", + intvl_secs); + thread_add_timer(client_ctx->tm, mgmt_fe_client_conn_timeout, + (void *)client_ctx, intvl_secs, + &client_ctx->conn_retry_tmr); +} + +/* + * Initialize library and try connecting with MGMTD. + */ +uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params, + struct thread_master *master_thread) +{ + assert(master_thread && params && strlen(params->name) + && !mgmt_fe_client_ctx.tm); + + mgmt_fe_client_ctx.tm = master_thread; + memcpy(&mgmt_fe_client_ctx.client_params, params, + sizeof(mgmt_fe_client_ctx.client_params)); + if (!mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec) + mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec = + MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC; + + assert(!mgmt_fe_client_ctx.ibuf_fifo + && !mgmt_fe_client_ctx.ibuf_work + && !mgmt_fe_client_ctx.obuf_fifo + && !mgmt_fe_client_ctx.obuf_work); + + mgmt_fe_client_ctx.ibuf_fifo = stream_fifo_new(); + mgmt_fe_client_ctx.ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); + mgmt_fe_client_ctx.obuf_fifo = stream_fifo_new(); + + mgmt_fe_client_ctx.obuf_work = NULL; + + mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions); + + /* Start trying to connect to MGMTD frontend server immediately */ + mgmt_fe_client_schedule_conn_retry(&mgmt_fe_client_ctx, 1); + + MGMTD_FE_CLIENT_DBG("Initialized client '%s'", params->name); + + return (uintptr_t)&mgmt_fe_client_ctx; +} + +/* + * Create a new Session for a Frontend Client connection. + */ +enum mgmt_result mgmt_fe_create_client_session(uintptr_t lib_hndl, + uint64_t client_id, + uintptr_t user_ctx) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = XCALLOC(MTYPE_MGMTD_FE_SESSION, + sizeof(struct mgmt_fe_client_session)); + assert(session); + session->user_ctx = user_ctx; + session->client_id = client_id; + session->client_ctx = client_ctx; + session->session_id = 0; + + if (mgmt_fe_send_session_req(client_ctx, session, true) != 0) { + XFREE(MTYPE_MGMTD_FE_SESSION, session); + return MGMTD_INTERNAL_ERROR; + } + mgmt_sessions_add_tail(&client_ctx->client_sessions, session); + + return MGMTD_SUCCESS; +} + +/* + * Delete an existing Session for a Frontend Client connection. + */ +enum mgmt_result mgmt_fe_destroy_client_session(uintptr_t lib_hndl, + uint64_t client_id) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = mgmt_fe_find_session_by_client_id(client_ctx, client_id); + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (session->session_id && + mgmt_fe_send_session_req(client_ctx, session, false) != 0) + MGMTD_FE_CLIENT_ERR( + "Failed to send session destroy request for the session-id %lu", + (unsigned long)session->session_id); + + mgmt_sessions_del(&client_ctx->client_sessions, session); + XFREE(MTYPE_MGMTD_FE_SESSION, session); + + return MGMTD_SUCCESS; +} + +static void mgmt_fe_destroy_client_sessions(uintptr_t lib_hndl) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return; + + FOREACH_SESSION_IN_LIST (client_ctx, session) + mgmt_fe_destroy_client_session(lib_hndl, session->client_id); +} + +/* + * Send UN/LOCK_DS_REQ to MGMTD for a specific Datastore DS. + */ +enum mgmt_result mgmt_fe_lock_ds(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + bool lock_ds) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = (struct mgmt_fe_client_session *)session_id; + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_fe_send_lockds_req(client_ctx, session, lock_ds, req_id, + ds_id) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send SET_CONFIG_REQ to MGMTD for one or more config data(s). + */ +enum mgmt_result +mgmt_fe_set_config_data(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangCfgDataReq **config_req, int num_reqs, + bool implicit_commit, Mgmtd__DatastoreId dst_ds_id) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = (struct mgmt_fe_client_session *)session_id; + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_fe_send_setcfg_req(client_ctx, session, req_id, ds_id, + config_req, num_reqs, implicit_commit, + dst_ds_id) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send SET_CONFIG_REQ to MGMTD for one or more config data(s). + */ +enum mgmt_result mgmt_fe_commit_config_data(uintptr_t lib_hndl, + uintptr_t session_id, + uint64_t req_id, + Mgmtd__DatastoreId src_ds_id, + Mgmtd__DatastoreId dst_ds_id, + bool validate_only, bool abort) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = (struct mgmt_fe_client_session *)session_id; + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_fe_send_commitcfg_req(client_ctx, session, req_id, src_ds_id, + dst_ds_id, validate_only, abort) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send GET_CONFIG_REQ to MGMTD for one or more config data item(s). + */ +enum mgmt_result +mgmt_fe_get_config_data(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangGetDataReq * data_req[], int num_reqs) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = (struct mgmt_fe_client_session *)session_id; + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_fe_send_getcfg_req(client_ctx, session, req_id, ds_id, + data_req, num_reqs) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send GET_DATA_REQ to MGMTD for one or more config data item(s). + */ +enum mgmt_result mgmt_fe_get_data(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangGetDataReq * data_req[], + int num_reqs) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = (struct mgmt_fe_client_session *)session_id; + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_fe_send_getdata_req(client_ctx, session, req_id, ds_id, + data_req, num_reqs) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send NOTIFY_REGISTER_REQ to MGMTD daemon. + */ +enum mgmt_result +mgmt_fe_register_yang_notify(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + bool register_req, + Mgmtd__YangDataXPath * data_req[], + int num_reqs) +{ + struct mgmt_fe_client_ctx *client_ctx; + struct mgmt_fe_client_session *session; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + session = (struct mgmt_fe_client_session *)session_id; + if (!session || session->client_ctx != client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_fe_send_regnotify_req(client_ctx, session, req_id, ds_id, + register_req, data_req, num_reqs) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Destroy library and cleanup everything. + */ +void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl) +{ + struct mgmt_fe_client_ctx *client_ctx; + + client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; + assert(client_ctx); + + MGMTD_FE_CLIENT_DBG("Destroying MGMTD Frontend Client '%s'", + client_ctx->client_params.name); + + mgmt_fe_server_disconnect(client_ctx, false); + + assert(mgmt_fe_client_ctx.ibuf_fifo && mgmt_fe_client_ctx.obuf_fifo); + + mgmt_fe_destroy_client_sessions(lib_hndl); + + THREAD_OFF(client_ctx->conn_retry_tmr); + THREAD_OFF(client_ctx->conn_read_ev); + THREAD_OFF(client_ctx->conn_write_ev); + THREAD_OFF(client_ctx->conn_writes_on); + THREAD_OFF(client_ctx->msg_proc_ev); + stream_fifo_free(mgmt_fe_client_ctx.ibuf_fifo); + if (mgmt_fe_client_ctx.ibuf_work) + stream_free(mgmt_fe_client_ctx.ibuf_work); + stream_fifo_free(mgmt_fe_client_ctx.obuf_fifo); + if (mgmt_fe_client_ctx.obuf_work) + stream_free(mgmt_fe_client_ctx.obuf_work); +} diff --git a/lib/mgmt_fe_client.h b/lib/mgmt_fe_client.h new file mode 100644 index 0000000000..4ebecca215 --- /dev/null +++ b/lib/mgmt_fe_client.h @@ -0,0 +1,358 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Frontend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_FE_CLIENT_H_ +#define _FRR_MGMTD_FE_CLIENT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "mgmtd/mgmt_defines.h" +#include "mgmt_pb.h" + +/*************************************************************** + * Macros + ***************************************************************/ + +/* + * The server port MGMTD daemon is listening for Backend Client + * connections. + */ + +#define MGMTD_FE_CLIENT_ERROR_STRING_MAX_LEN 32 + +#define MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC 5 + +#define MGMTD_FE_MSG_PROC_DELAY_USEC 10 +#define MGMTD_FE_MAX_NUM_MSG_PROC 500 + +#define MGMTD_FE_MSG_WRITE_DELAY_MSEC 1 +#define MGMTD_FE_MAX_NUM_MSG_WRITE 100 + +#define GMGD_FE_MAX_NUM_REQ_ITEMS 64 + +#define MGMTD_FE_MSG_MAX_LEN 9000 + +#define MGMTD_SOCKET_FE_SEND_BUF_SIZE 65535 +#define MGMTD_SOCKET_FE_RECV_BUF_SIZE MGMTD_SOCKET_FE_SEND_BUF_SIZE + +/*************************************************************** + * Data-structures + ***************************************************************/ + +#define MGMTD_SESSION_ID_NONE 0 + +#define MGMTD_CLIENT_ID_NONE 0 + +#define MGMTD_DS_NONE MGMTD__DATASTORE_ID__DS_NONE +#define MGMTD_DS_RUNNING MGMTD__DATASTORE_ID__RUNNING_DS +#define MGMTD_DS_CANDIDATE MGMTD__DATASTORE_ID__CANDIDATE_DS +#define MGMTD_DS_OPERATIONAL MGMTD__DATASTORE_ID__OPERATIONAL_DS +#define MGMTD_DS_MAX_ID MGMTD_DS_OPERATIONAL + 1 + +struct mgmt_fe_msg_hdr { + uint16_t marker; + uint16_t len; /* Includes header */ +}; +#define MGMTD_FE_MSG_HDR_LEN sizeof(struct mgmt_fe_msg_hdr) +#define MGMTD_FE_MSG_MARKER 0xdeaf + +struct mgmt_fe_msg { + struct mgmt_fe_msg_hdr hdr; + uint8_t payload[]; +}; + +/* + * All the client specific information this library needs to + * initialize itself, setup connection with MGMTD FrontEnd interface + * and carry on all required procedures appropriately. + * + * FrontEnd clients need to initialise a instance of this structure + * with appropriate data and pass it while calling the API + * to initialize the library (See mgmt_fe_client_lib_init for + * more details). + */ +struct mgmt_fe_client_params { + char name[MGMTD_CLIENT_NAME_MAX_LEN]; + uintptr_t user_data; + unsigned long conn_retry_intvl_sec; + + void (*client_connect_notify)(uintptr_t lib_hndl, + uintptr_t user_data, + bool connected); + + void (*client_session_notify)(uintptr_t lib_hndl, + uintptr_t user_data, + uint64_t client_id, + bool create, bool success, + uintptr_t session_id, + uintptr_t user_session_ctx); + + void (*lock_ds_notify)(uintptr_t lib_hndl, uintptr_t user_data, + uint64_t client_id, uintptr_t session_id, + uintptr_t user_session_ctx, uint64_t req_id, + bool lock_ds, bool success, + Mgmtd__DatastoreId ds_id, char *errmsg_if_any); + + void (*set_config_notify)(uintptr_t lib_hndl, uintptr_t user_data, + uint64_t client_id, uintptr_t session_id, + uintptr_t user_session_ctx, uint64_t req_id, + bool success, Mgmtd__DatastoreId ds_id, + char *errmsg_if_any); + + void (*commit_config_notify)( + uintptr_t lib_hndl, uintptr_t user_data, uint64_t client_id, + uintptr_t session_id, uintptr_t user_session_ctx, + uint64_t req_id, bool success, Mgmtd__DatastoreId src_ds_id, + Mgmtd__DatastoreId dst_ds_id, bool validate_only, + char *errmsg_if_any); + + enum mgmt_result (*get_data_notify)( + uintptr_t lib_hndl, uintptr_t user_data, uint64_t client_id, + uintptr_t session_id, uintptr_t user_session_ctx, + uint64_t req_id, bool success, Mgmtd__DatastoreId ds_id, + Mgmtd__YangData **yang_data, size_t num_data, int next_key, + char *errmsg_if_any); + + enum mgmt_result (*data_notify)( + uint64_t client_id, uint64_t session_id, uintptr_t user_data, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangData **yang_data, size_t num_data); +}; + +/*************************************************************** + * API prototypes + ***************************************************************/ + +/* + * Initialize library and try connecting with MGMTD FrontEnd interface. + * + * params + * Frontend client parameters. + * + * master_thread + * Thread master. + * + * Returns: + * Frontend client lib handler (nothing but address of mgmt_fe_client_ctx) + */ +extern uintptr_t +mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params, + struct thread_master *master_thread); + +/* + * Create a new Session for a Frontend Client connection. + * + * lib_hndl + * Client library handler. + * + * client_id + * Unique identifier of client. + * + * user_ctx + * Client context. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result mgmt_fe_create_client_session(uintptr_t lib_hndl, + uint64_t client_id, + uintptr_t user_ctx); + +/* + * Delete an existing Session for a Frontend Client connection. + * + * lib_hndl + * Client library handler. + * + * client_id + * Unique identifier of client. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result mgmt_fe_destroy_client_session(uintptr_t lib_hndl, + uint64_t client_id); + +/* + * Send UN/LOCK_DS_REQ to MGMTD for a specific Datastore DS. + * + * lib_hndl + * Client library handler. + * + * session_id + * Client session ID. + * + * req_id + * Client request ID. + * + * ds_id + * Datastore ID (Running/Candidate/Oper/Startup) + * + * lock_ds + * TRUE for lock request, FALSE for unlock request. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_fe_lock_ds(uintptr_t lib_hndl, uintptr_t session_id, uint64_t req_id, + Mgmtd__DatastoreId ds_id, bool lock_ds); + +/* + * Send SET_CONFIG_REQ to MGMTD for one or more config data(s). + * + * lib_hndl + * Client library handler. + * + * session_id + * Client session ID. + * + * req_id + * Client request ID. + * + * ds_id + * Datastore ID (Running/Candidate/Oper/Startup) + * + * conf_req + * Details regarding the SET_CONFIG_REQ. + * + * num_req + * Number of config requests. + * + * implcit commit + * TRUE for implicit commit, FALSE otherwise. + * + * dst_ds_id + * Destination Datastore ID where data needs to be set. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_fe_set_config_data(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangCfgDataReq **config_req, int num_req, + bool implicit_commit, Mgmtd__DatastoreId dst_ds_id); + +/* + * Send SET_COMMMIT_REQ to MGMTD for one or more config data(s). + * + * lib_hndl + * Client library handler. + * + * session_id + * Client session ID. + * + * req_id + * Client request ID. + * + * src_ds_id + * Source datastore ID from where data needs to be committed from. + * + * dst_ds_id + * Destination datastore ID where data needs to be committed to. + * + * validate_only + * TRUE if data needs to be validated only, FALSE otherwise. + * + * abort + * TRUE if need to restore Src DS back to Dest DS, FALSE otherwise. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_fe_commit_config_data(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId src_ds_id, + Mgmtd__DatastoreId dst_ds_id, bool validate_only, + bool abort); + +/* + * Send GET_CONFIG_REQ to MGMTD for one or more config data item(s). + * + * lib_hndl + * Client library handler. + * + * session_id + * Client session ID. + * + * req_id + * Client request ID. + * + * ds_id + * Datastore ID (Running/Candidate) + * + * data_req + * Get config requested. + * + * num_req + * Number of get config requests. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_fe_get_config_data(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + Mgmtd__YangGetDataReq **data_req, int num_reqs); + +/* + * Send GET_DATA_REQ to MGMTD for one or more data item(s). + * + * Similar to get config request but supports getting data + * from operational ds aka backend clients directly. + */ +extern enum mgmt_result +mgmt_fe_get_data(uintptr_t lib_hndl, uintptr_t session_id, uint64_t req_id, + Mgmtd__DatastoreId ds_id, Mgmtd__YangGetDataReq **data_req, + int num_reqs); + +/* + * Send NOTIFY_REGISTER_REQ to MGMTD daemon. + * + * lib_hndl + * Client library handler. + * + * session_id + * Client session ID. + * + * req_id + * Client request ID. + * + * ds_id + * Datastore ID. + * + * register_req + * TRUE if registering, FALSE otherwise. + * + * data_req + * Details of the YANG notification data. + * + * num_reqs + * Number of data requests. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_fe_register_yang_notify(uintptr_t lib_hndl, uintptr_t session_id, + uint64_t req_id, Mgmtd__DatastoreId ds_id, + bool register_req, + Mgmtd__YangDataXPath **data_req, int num_reqs); + +/* + * Destroy library and cleanup everything. + */ +extern void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl); + +#ifdef __cplusplus +} +#endif + +#endif /* _FRR_MGMTD_FE_CLIENT_H_ */ diff --git a/lib/mgmt_pb.h b/lib/mgmt_pb.h new file mode 100644 index 0000000000..08bb748233 --- /dev/null +++ b/lib/mgmt_pb.h @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD protobuf main header file + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_PB_H_ +#define _FRR_MGMTD_PB_H_ + +#include "lib/mgmt.pb-c.h" + +#define mgmt_yang_data_xpath_init(ptr) mgmtd__yang_data_xpath__init(ptr) + +#define mgmt_yang_data_value_init(ptr) mgmtd__yang_data_value__init(ptr) + +#define mgmt_yang_data_init(ptr) mgmtd__yang_data__init(ptr) + +#define mgmt_yang_data_reply_init(ptr) mgmtd__yang_data_reply__init(ptr) + +#define mgmt_yang_cfg_data_req_init(ptr) mgmtd__yang_cfg_data_req__init(ptr) + +#define mgmt_yang_get_data_req_init(ptr) mgmtd__yang_get_data_req__init(ptr) + +#endif /* _FRR_MGMTD_PB_H_ */ diff --git a/lib/northbound.h b/lib/northbound.h index 064e55cf77..572d669dc1 100644 --- a/lib/northbound.h +++ b/lib/northbound.h @@ -68,6 +68,12 @@ enum nb_operation { NB_OP_RPC, }; +struct nb_cfg_change { + char xpath[XPATH_MAXLEN]; + enum nb_operation operation; + const char *value; +}; + union nb_resource { int fd; void *ptr; diff --git a/lib/northbound_cli.c b/lib/northbound_cli.c index fa5884fb78..27db2059bb 100644 --- a/lib/northbound_cli.c +++ b/lib/northbound_cli.c @@ -120,7 +120,7 @@ static int nb_cli_schedule_command(struct vty *vty) void nb_cli_enqueue_change(struct vty *vty, const char *xpath, enum nb_operation operation, const char *value) { - struct vty_cfg_change *change; + struct nb_cfg_change *change; if (vty->num_cfg_changes == VTY_MAXCFGCHANGES) { /* Not expected to happen. */ @@ -149,7 +149,7 @@ static int nb_cli_apply_changes_internal(struct vty *vty, /* Edit candidate configuration. */ for (size_t i = 0; i < vty->num_cfg_changes; i++) { - struct vty_cfg_change *change = &vty->cfg_changes[i]; + struct nb_cfg_change *change = &vty->cfg_changes[i]; struct nb_node *nb_node; char xpath[XPATH_MAXLEN]; struct yang_data *data; diff --git a/lib/subdir.am b/lib/subdir.am index beef8675aa..eb7cbc4f4f 100644 --- a/lib/subdir.am +++ b/lib/subdir.am @@ -64,6 +64,7 @@ lib_libfrr_la_SOURCES = \ lib/log_vty.c \ lib/md5.c \ lib/memory.c \ + lib/mgmt_fe_client.c \ lib/mlag.c \ lib/module.c \ lib/mpls.c \ @@ -146,6 +147,23 @@ nodist_lib_libfrr_la_SOURCES = \ yang/frr-module-translator.yang.c \ # end +# Add logic to build mgmt.proto +lib_libfrr_la_LIBADD += $(PROTOBUF_C_LIBS) + +BUILT_SOURCES += \ + lib/mgmt.pb-c.c \ + lib/mgmt.pb-c.h \ + # end + +CLEANFILES += \ + lib/mgmt.pb-c.h \ + lib/mgmt.pb-c.c \ + # end + +lib_libfrr_la_SOURCES += \ + lib/mgmt.pb-c.c \ + #end + if SQLITE3 lib_libfrr_la_LIBADD += $(SQLITE3_LIBS) lib_libfrr_la_SOURCES += lib/db.c @@ -222,6 +240,9 @@ pkginclude_HEADERS += \ lib/log_vty.h \ lib/md5.h \ lib/memory.h \ + lib/mgmt.pb-c.h \ + lib/mgmt_fe_client.h \ + lib/mgmt_pb.h \ lib/module.h \ lib/monotime.h \ lib/mpls.h \ @@ -65,6 +65,14 @@ enum vty_event { #endif /* VTYSH */ }; +struct nb_config *vty_mgmt_candidate_config; + +static uintptr_t mgmt_lib_hndl; +static bool mgmt_fe_connected; +static bool mgmt_candidate_ds_wr_locked; +static uint64_t mgmt_client_id_next; +static uint64_t mgmt_last_req_id = UINT64_MAX; + PREDECL_DLIST(vtyservs); struct vty_serv { @@ -80,6 +88,7 @@ DECLARE_DLIST(vtyservs, struct vty_serv, itm); static void vty_event_serv(enum vty_event event, struct vty_serv *); static void vty_event(enum vty_event, struct vty *); +static int vtysh_flush(struct vty *vty); /* Extern host structure from command.c */ extern struct host host; @@ -112,6 +121,36 @@ static char integrate_default[] = SYSCONFDIR INTEGRATE_DEFAULT_CONFIG; static bool do_log_commands; static bool do_log_commands_perm; +static void vty_mgmt_resume_response(struct vty *vty, bool success) +{ + uint8_t header[4] = {0, 0, 0, 0}; + int ret = CMD_SUCCESS; + + if (!vty->mgmt_req_pending) { + zlog_err( + "vty response called without setting mgmt_req_pending"); + return; + } + + if (!success) + ret = CMD_WARNING_CONFIG_FAILED; + + vty->mgmt_req_pending = false; + header[3] = ret; + buffer_put(vty->obuf, header, 4); + + if (!vty->t_write && (vtysh_flush(vty) < 0)) + /* Try to flush results; exit if a write + * error occurs. + */ + return; + + if (vty->status == VTY_CLOSE) + vty_close(vty); + else + vty_event(VTYSH_READ, vty); +} + void vty_frame(struct vty *vty, const char *format, ...) { va_list args; @@ -1586,6 +1625,17 @@ struct vty *vty_new(void) new->max = VTY_BUFSIZ; new->pass_fd = -1; + if (mgmt_lib_hndl) { + new->mgmt_client_id = mgmt_client_id_next++; + if (mgmt_fe_create_client_session(mgmt_lib_hndl, + new->mgmt_client_id, + (uintptr_t)new) + != MGMTD_SUCCESS) + zlog_err( + "Failed to open a MGMTD Frontend session for VTY session %p!!", + new); + } + return new; } @@ -2201,6 +2251,12 @@ static void vtysh_read(struct thread *thread) if (ret == CMD_SUSPEND) break; + /* with new infra we need to stop response till + * we get response through callback. + */ + if (vty->mgmt_req_pending) + return; + /* warning: watchfrr hardcodes this result write */ header[3] = ret; @@ -2257,6 +2313,12 @@ void vty_close(struct vty *vty) int i; bool was_stdio = false; + if (mgmt_lib_hndl) { + mgmt_fe_destroy_client_session(mgmt_lib_hndl, + vty->mgmt_client_id); + vty->mgmt_session_id = 0; + } + /* Drop out of configure / transaction if needed. */ vty_config_exit(vty); @@ -2632,6 +2694,24 @@ int vty_config_enter(struct vty *vty, bool private_config, bool exclusive) return CMD_WARNING; } + if (vty_mgmt_fe_enabled()) { + if (!mgmt_candidate_ds_wr_locked) { + if (vty_mgmt_send_lockds_req(vty, MGMTD_DS_CANDIDATE, + true) + != 0) { + vty_out(vty, "Not able to lock candidate DS\n"); + return CMD_WARNING; + } + } else { + vty_out(vty, + "Candidate DS already locked by different session\n"); + return CMD_WARNING; + } + + vty->mgmt_locked_candidate_ds = true; + mgmt_candidate_ds_wr_locked = true; + } + vty->node = CONFIG_NODE; vty->config = true; vty->private_config = private_config; @@ -2643,7 +2723,14 @@ int vty_config_enter(struct vty *vty, bool private_config, bool exclusive) vty_out(vty, "Warning: uncommitted changes will be discarded on exit.\n\n"); } else { - vty->candidate_config = vty_shared_candidate_config; + /* + * NOTE: On the MGMTD daemon we point the VTY candidate DS to + * the global MGMTD candidate DS. Else we point to the VTY + * Shared Candidate Config. + */ + vty->candidate_config = vty_mgmt_candidate_config + ? vty_mgmt_candidate_config + : vty_shared_candidate_config; if (frr_get_cli_mode() == FRR_CLI_TRANSACTIONAL) vty->candidate_config_base = nb_config_dup(running_config); @@ -2676,6 +2763,18 @@ int vty_config_node_exit(struct vty *vty) { vty->xpath_index = 0; + if (vty_mgmt_fe_enabled() && mgmt_candidate_ds_wr_locked + && vty->mgmt_locked_candidate_ds) { + if (vty_mgmt_send_lockds_req(vty, MGMTD_DS_CANDIDATE, false) + != 0) { + vty_out(vty, "Not able to unlock candidate DS\n"); + return CMD_WARNING; + } + + vty->mgmt_locked_candidate_ds = false; + mgmt_candidate_ds_wr_locked = false; + } + /* Perform any pending commits. */ (void)nb_cli_pending_commit_check(vty); @@ -3173,6 +3272,398 @@ void vty_init_vtysh(void) /* currently nothing to do, but likely to have future use */ } +static void vty_mgmt_server_connected(uintptr_t lib_hndl, uintptr_t usr_data, + bool connected) +{ + zlog_err("%sGot %sconnected %s MGMTD Frontend Server", + !connected ? "ERROR: " : "", !connected ? "dis: " : "", + !connected ? "from" : "to"); + + mgmt_fe_connected = connected; + + /* + * TODO: Setup or teardown front-end sessions for existing + * VTY connections. + */ +} + +static void vty_mgmt_session_created(uintptr_t lib_hndl, uintptr_t usr_data, + uint64_t client_id, bool create, + bool success, uintptr_t session_id, + uintptr_t session_ctx) +{ + struct vty *vty; + + vty = (struct vty *)session_ctx; + + if (!success) { + zlog_err("%s session for client %llu failed!", + create ? "Creating" : "Destroying", + (unsigned long long)client_id); + return; + } + + zlog_err("%s session for client %llu successfully!", + create ? "Created" : "Destroyed", + (unsigned long long)client_id); + if (create) + vty->mgmt_session_id = session_id; +} + +static void vty_mgmt_ds_lock_notified(uintptr_t lib_hndl, uintptr_t usr_data, + uint64_t client_id, uintptr_t session_id, + uintptr_t session_ctx, uint64_t req_id, + bool lock_ds, bool success, + Mgmtd__DatastoreId ds_id, + char *errmsg_if_any) +{ + struct vty *vty; + + vty = (struct vty *)session_ctx; + + if (!success) { + zlog_err("%socking for DS %u failed! Err: '%s'", + lock_ds ? "L" : "Unl", ds_id, errmsg_if_any); + vty_out(vty, "ERROR: %socking for DS %u failed! Err: '%s'\n", + lock_ds ? "L" : "Unl", ds_id, errmsg_if_any); + } else { + zlog_err("%socked DS %u successfully!", lock_ds ? "L" : "Unl", + ds_id); + } + + vty_mgmt_resume_response(vty, success); +} + +static void vty_mgmt_set_config_result_notified( + uintptr_t lib_hndl, uintptr_t usr_data, uint64_t client_id, + uintptr_t session_id, uintptr_t session_ctx, uint64_t req_id, + bool success, Mgmtd__DatastoreId ds_id, char *errmsg_if_any) +{ + struct vty *vty; + + vty = (struct vty *)session_ctx; + + if (!success) { + zlog_err( + "SET_CONFIG request for client 0x%llx failed! Error: '%s'", + (unsigned long long)client_id, + errmsg_if_any ? errmsg_if_any : "Unknown"); + vty_out(vty, "ERROR: SET_CONFIG request failed! Error: %s\n", + errmsg_if_any ? errmsg_if_any : "Unknown"); + } else { + zlog_err( + "SET_CONFIG request for client 0x%llx req-id %llu was successfull!", + (unsigned long long)client_id, + (unsigned long long)req_id); + } + + vty_mgmt_resume_response(vty, success); +} + +static void vty_mgmt_commit_config_result_notified( + uintptr_t lib_hndl, uintptr_t usr_data, uint64_t client_id, + uintptr_t session_id, uintptr_t session_ctx, uint64_t req_id, + bool success, Mgmtd__DatastoreId src_ds_id, Mgmtd__DatastoreId dst_ds_id, + bool validate_only, char *errmsg_if_any) +{ + struct vty *vty; + + vty = (struct vty *)session_ctx; + + if (!success) { + zlog_err( + "COMMIT_CONFIG request for client 0x%llx failed! Error: '%s'", + (unsigned long long)client_id, + errmsg_if_any ? errmsg_if_any : "Unknown"); + vty_out(vty, "ERROR: COMMIT_CONFIG request failed! Error: %s\n", + errmsg_if_any ? errmsg_if_any : "Unknown"); + } else { + zlog_err( + "COMMIT_CONFIG request for client 0x%llx req-id %llu was successfull!", + (unsigned long long)client_id, + (unsigned long long)req_id); + if (errmsg_if_any) + vty_out(vty, "MGMTD: %s\n", errmsg_if_any); + } + + vty_mgmt_resume_response(vty, success); +} + +static enum mgmt_result vty_mgmt_get_data_result_notified( + uintptr_t lib_hndl, uintptr_t usr_data, uint64_t client_id, + uintptr_t session_id, uintptr_t session_ctx, uint64_t req_id, + bool success, Mgmtd__DatastoreId ds_id, Mgmtd__YangData **yang_data, + size_t num_data, int next_key, char *errmsg_if_any) +{ + struct vty *vty; + size_t indx; + + vty = (struct vty *)session_ctx; + + if (!success) { + zlog_err( + "GET_DATA request for client 0x%llx failed! Error: '%s'", + (unsigned long long)client_id, + errmsg_if_any ? errmsg_if_any : "Unknown"); + vty_out(vty, "ERROR: GET_DATA request failed! Error: %s\n", + errmsg_if_any ? errmsg_if_any : "Unknown"); + vty_mgmt_resume_response(vty, success); + return MGMTD_INTERNAL_ERROR; + } + + zlog_debug( + "GET_DATA request for client 0x%llx req-id %llu was successfull!", + (unsigned long long)client_id, (unsigned long long)req_id); + + if (req_id != mgmt_last_req_id) { + mgmt_last_req_id = req_id; + vty_out(vty, "[\n"); + } + + for (indx = 0; indx < num_data; indx++) { + vty_out(vty, " \"%s\": \"%s\"\n", yang_data[indx]->xpath, + yang_data[indx]->value->encoded_str_val); + } + if (next_key < 0) { + vty_out(vty, "]\n"); + vty_mgmt_resume_response(vty, success); + } + + return MGMTD_SUCCESS; +} + +static struct mgmt_fe_client_params client_params = { + .client_connect_notify = vty_mgmt_server_connected, + .client_session_notify = vty_mgmt_session_created, + .lock_ds_notify = vty_mgmt_ds_lock_notified, + .set_config_notify = + vty_mgmt_set_config_result_notified, + .commit_config_notify = + vty_mgmt_commit_config_result_notified, + .get_data_notify = vty_mgmt_get_data_result_notified, +}; + +void vty_init_mgmt_fe(void) +{ + if (!vty_master) { + zlog_err( + "Always call vty_mgmt_init_fe() after vty_init()!!"); + return; + } + + assert(!mgmt_lib_hndl); + snprintf(client_params.name, sizeof(client_params.name), "%s-%lld", + frr_get_progname(), (long long)getpid()); + mgmt_lib_hndl = mgmt_fe_client_lib_init(&client_params, vty_master); + assert(mgmt_lib_hndl); +} + +bool vty_mgmt_fe_enabled(void) +{ + return mgmt_lib_hndl && mgmt_fe_connected ? true : false; +} + +int vty_mgmt_send_lockds_req(struct vty *vty, Mgmtd__DatastoreId ds_id, + bool lock) +{ + enum mgmt_result ret; + + if (mgmt_lib_hndl && vty->mgmt_session_id) { + vty->mgmt_req_id++; + ret = mgmt_fe_lock_ds(mgmt_lib_hndl, vty->mgmt_session_id, + vty->mgmt_req_id, ds_id, lock); + if (ret != MGMTD_SUCCESS) { + zlog_err( + "Failed to send %sLOCK-DS-REQ to MGMTD for req-id %llu.", + lock ? "" : "UN", + (unsigned long long)vty->mgmt_req_id); + vty_out(vty, "Failed to send %sLOCK-DS-REQ to MGMTD!", + lock ? "" : "UN"); + return -1; + } + + vty->mgmt_req_pending = true; + } + + return 0; +} + +int vty_mgmt_send_config_data(struct vty *vty) +{ + Mgmtd__YangDataValue value[VTY_MAXCFGCHANGES]; + Mgmtd__YangData cfg_data[VTY_MAXCFGCHANGES]; + Mgmtd__YangCfgDataReq cfg_req[VTY_MAXCFGCHANGES]; + Mgmtd__YangCfgDataReq * cfgreq[VTY_MAXCFGCHANGES] = {0}; + size_t indx; + int cnt; + + if (mgmt_lib_hndl && vty->mgmt_session_id) { + cnt = 0; + for (indx = 0; indx < vty->num_cfg_changes; indx++) { + mgmt_yang_data_init(&cfg_data[cnt]); + + if (vty->cfg_changes[indx].value) { + mgmt_yang_data_value_init(&value[cnt]); + value[cnt].encoded_str_val = + (char *)vty->cfg_changes[indx].value; + value[cnt].value_case = + MGMTD__YANG_DATA_VALUE__VALUE_ENCODED_STR_VAL; + cfg_data[cnt].value = &value[cnt]; + } + + cfg_data[cnt].xpath = vty->cfg_changes[indx].xpath; + + mgmt_yang_cfg_data_req_init(&cfg_req[cnt]); + cfg_req[cnt].data = &cfg_data[cnt]; + switch (vty->cfg_changes[indx].operation) { + case NB_OP_DESTROY: + cfg_req[cnt].req_type = + MGMTD__CFG_DATA_REQ_TYPE__DELETE_DATA; + break; + + case NB_OP_CREATE: + case NB_OP_MODIFY: + case NB_OP_MOVE: + case NB_OP_PRE_VALIDATE: + case NB_OP_APPLY_FINISH: + cfg_req[cnt].req_type = + MGMTD__CFG_DATA_REQ_TYPE__SET_DATA; + break; + case NB_OP_GET_ELEM: + case NB_OP_GET_NEXT: + case NB_OP_GET_KEYS: + case NB_OP_LOOKUP_ENTRY: + case NB_OP_RPC: + assert(!"Invalid type of operation"); + break; + default: + assert(!"non-enum value, invalid"); + } + + cfgreq[cnt] = &cfg_req[cnt]; + cnt++; + } + + vty->mgmt_req_id++; + if (cnt + && mgmt_fe_set_config_data( + mgmt_lib_hndl, vty->mgmt_session_id, + vty->mgmt_req_id, MGMTD_DS_CANDIDATE, cfgreq, + cnt, + frr_get_cli_mode() == FRR_CLI_CLASSIC + ? ((vty->pending_allowed + || vty->no_implicit_commit) + ? false + : true) + : false, + MGMTD_DS_RUNNING) + != MGMTD_SUCCESS) { + zlog_err("Failed to send %d Config Xpaths to MGMTD!!", + (int)indx); + return -1; + } + + vty->mgmt_req_pending = true; + } + + return 0; +} + +int vty_mgmt_send_commit_config(struct vty *vty, bool validate_only, bool abort) +{ + enum mgmt_result ret; + + if (mgmt_lib_hndl && vty->mgmt_session_id) { + vty->mgmt_req_id++; + ret = mgmt_fe_commit_config_data( + mgmt_lib_hndl, vty->mgmt_session_id, vty->mgmt_req_id, + MGMTD_DS_CANDIDATE, MGMTD_DS_RUNNING, validate_only, + abort); + if (ret != MGMTD_SUCCESS) { + zlog_err( + "Failed to send COMMIT-REQ to MGMTD for req-id %llu.", + (unsigned long long)vty->mgmt_req_id); + vty_out(vty, "Failed to send COMMIT-REQ to MGMTD!"); + return -1; + } + + vty->mgmt_req_pending = true; + } + + return 0; +} + +int vty_mgmt_send_get_config(struct vty *vty, Mgmtd__DatastoreId datastore, + const char **xpath_list, int num_req) +{ + enum mgmt_result ret; + Mgmtd__YangData yang_data[VTY_MAXCFGCHANGES]; + Mgmtd__YangGetDataReq get_req[VTY_MAXCFGCHANGES]; + Mgmtd__YangGetDataReq * getreq[VTY_MAXCFGCHANGES]; + int i; + + vty->mgmt_req_id++; + + for (i = 0; i < num_req; i++) { + mgmt_yang_get_data_req_init(&get_req[i]); + mgmt_yang_data_init(&yang_data[i]); + + yang_data->xpath = (char *)xpath_list[i]; + + get_req[i].data = &yang_data[i]; + getreq[i] = &get_req[i]; + } + ret = mgmt_fe_get_config_data(mgmt_lib_hndl, vty->mgmt_session_id, + vty->mgmt_req_id, datastore, getreq, + num_req); + + if (ret != MGMTD_SUCCESS) { + zlog_err("Failed to send GET-CONFIG to MGMTD for req-id %llu.", + (unsigned long long)vty->mgmt_req_id); + vty_out(vty, "Failed to send GET-CONFIG to MGMTD!"); + return -1; + } + + vty->mgmt_req_pending = true; + + return 0; +} + +int vty_mgmt_send_get_data(struct vty *vty, Mgmtd__DatastoreId datastore, + const char **xpath_list, int num_req) +{ + enum mgmt_result ret; + Mgmtd__YangData yang_data[VTY_MAXCFGCHANGES]; + Mgmtd__YangGetDataReq get_req[VTY_MAXCFGCHANGES]; + Mgmtd__YangGetDataReq * getreq[VTY_MAXCFGCHANGES]; + int i; + + vty->mgmt_req_id++; + + for (i = 0; i < num_req; i++) { + mgmt_yang_get_data_req_init(&get_req[i]); + mgmt_yang_data_init(&yang_data[i]); + + yang_data->xpath = (char *)xpath_list[i]; + + get_req[i].data = &yang_data[i]; + getreq[i] = &get_req[i]; + } + ret = mgmt_fe_get_data(mgmt_lib_hndl, vty->mgmt_session_id, + vty->mgmt_req_id, datastore, getreq, num_req); + + if (ret != MGMTD_SUCCESS) { + zlog_err("Failed to send GET-DATA to MGMTD for req-id %llu.", + (unsigned long long)vty->mgmt_req_id); + vty_out(vty, "Failed to send GET-DATA to MGMTD!"); + return -1; + } + + vty->mgmt_req_pending = true; + + return 0; +} + /* Install vty's own commands like `who' command. */ void vty_init(struct thread_master *master_thread, bool do_command_logging) { @@ -3220,6 +3711,11 @@ void vty_terminate(void) struct vty *vty; struct vty_serv *vtyserv; + if (mgmt_lib_hndl) { + mgmt_fe_client_lib_destroy(mgmt_lib_hndl); + mgmt_lib_hndl = 0; + } + memset(vty_cwd, 0x00, sizeof(vty_cwd)); vty_reset(); @@ -25,6 +25,7 @@ #include "compiler.h" #include "northbound.h" #include "zlog_live.h" +#include "mgmt_fe_client.h" #ifdef __cplusplus extern "C" { @@ -113,7 +114,7 @@ struct vty { /* Changes enqueued to be applied in the candidate configuration. */ size_t num_cfg_changes; - struct vty_cfg_change cfg_changes[VTY_MAXCFGCHANGES]; + struct nb_cfg_change cfg_changes[VTY_MAXCFGCHANGES]; /* XPath of the current node */ int xpath_index; @@ -134,6 +135,7 @@ struct vty { /* Dynamic transaction information. */ bool pending_allowed; bool pending_commit; + bool no_implicit_commit; char *pending_cmds_buf; size_t pending_cmds_buflen; size_t pending_cmds_bufpos; @@ -208,6 +210,12 @@ struct vty { * without any output. */ size_t frame_pos; char frame[1024]; + + uintptr_t mgmt_session_id; + uint64_t mgmt_client_id; + uint64_t mgmt_req_id; + bool mgmt_req_pending; + bool mgmt_locked_candidate_ds; }; static inline void vty_push_context(struct vty *vty, int node, uint64_t id) @@ -319,6 +327,8 @@ struct vty_arg { #define IS_DIRECTORY_SEP(c) ((c) == DIRECTORY_SEP) #endif +extern struct nb_config *vty_mgmt_candidate_config; + /* Prototypes. */ extern void vty_init(struct thread_master *, bool do_command_logging); extern void vty_init_vtysh(void); @@ -370,6 +380,18 @@ extern void vty_stdio_suspend(void); extern void vty_stdio_resume(void); extern void vty_stdio_close(void); +extern void vty_init_mgmt_fe(void); +extern bool vty_mgmt_fe_enabled(void); +extern int vty_mgmt_send_config_data(struct vty *vty); +extern int vty_mgmt_send_commit_config(struct vty *vty, bool validate_only, + bool abort); +extern int vty_mgmt_send_get_config(struct vty *vty, Mgmtd__DatastoreId datastore, + const char **xpath_list, int num_req); +extern int vty_mgmt_send_get_data(struct vty *vty, Mgmtd__DatastoreId datastore, + const char **xpath_list, int num_req); +extern int vty_mgmt_send_lockds_req(struct vty *vty, Mgmtd__DatastoreId ds_id, + bool lock); + #ifdef __cplusplus } #endif |
