#include "command.h"
#include "hook.h"
+#include "frr_pthread.h"
+#include "mlag.h"
#include "zebra/zebra_mlag.h"
+#include "zebra/zebra_mlag_private.h"
#include "zebra/zebra_router.h"
+#include "zebra/zebra_memory.h"
#include "zebra/zapi_msg.h"
#include "zebra/debug.h"
#include "zebra/zebra_mlag_clippy.c"
#endif
+#define ZEBRA_MLAG_METADATA_LEN 4
+#define ZEBRA_MLAG_MSG_BCAST 0xFFFFFFFF
+
+uint8_t mlag_wr_buffer[ZEBRA_MLAG_BUF_LIMIT];
+uint8_t mlag_rd_buffer[ZEBRA_MLAG_BUF_LIMIT];
+uint32_t mlag_rd_buf_offset;
+
+static bool test_mlag_in_progress;
+
+static int zebra_mlag_signal_write_thread(void);
+static int zebra_mlag_terminate_pthread(struct thread *event);
+static int zebra_mlag_post_data_from_main_thread(struct thread *thread);
+static void zebra_mlag_publish_process_state(struct zserv *client,
+ zebra_message_types_t msg_type);
+
+/**********************MLAG Interaction***************************************/
+
+/*
+ * API to post the Registration to MLAGD
+ * MLAG will not process any messages with out the registration
+ */
+void zebra_mlag_send_register(void)
+{
+ struct stream *s = NULL;
+
+ s = stream_new(sizeof(struct mlag_msg));
+
+ stream_putl(s, MLAG_REGISTER);
+ stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
+ stream_putw(s, MLAG_MSG_NO_BATCH);
+ stream_fifo_push_safe(zrouter.mlag_info.mlag_fifo, s);
+ zebra_mlag_signal_write_thread();
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: Enqueued MLAG Register to MLAG Thread ",
+ __func__);
+}
+
+/*
+ * API to post the De-Registration to MLAGD
+ * MLAG will not process any messages after the de-registration
+ */
+void zebra_mlag_send_deregister(void)
+{
+ struct stream *s = NULL;
+
+ s = stream_new(sizeof(struct mlag_msg));
+
+ stream_putl(s, MLAG_DEREGISTER);
+ stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
+ stream_putw(s, MLAG_MSG_NO_BATCH);
+ stream_fifo_push_safe(zrouter.mlag_info.mlag_fifo, s);
+ zebra_mlag_signal_write_thread();
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: Enqueued MLAG De-Register to MLAG Thread ",
+ __func__);
+}
+
+/*
+ * API To handle MLAG Received data
+ * Decodes the data using protobuf and enqueue to main thread
+ * main thread publish this to clients based on client subscription
+ */
+void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len)
+{
+ struct stream *s = NULL;
+ struct stream *s1 = NULL;
+ int msg_type = 0;
+
+ s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+ msg_type = zebra_mlag_protobuf_decode_message(&s, data, len);
+
+ if (msg_type <= 0) {
+ /* Something went wrong in decoding */
+ stream_free(s);
+ zlog_err("%s: failed to process mlag data-%d, %u", __func__,
+ msg_type, len);
+ return;
+ }
+
+ /*
+ * additional four bytes are for message type
+ */
+ s1 = stream_new(stream_get_endp(s) + ZEBRA_MLAG_METADATA_LEN);
+ stream_putl(s1, msg_type);
+ stream_put(s1, s->data, stream_get_endp(s));
+ thread_add_event(zrouter.master, zebra_mlag_post_data_from_main_thread,
+ s1, 0, NULL);
+ stream_free(s);
+}
+
+/**********************End of MLAG Interaction********************************/
+
+/************************MLAG Thread Processing*******************************/
+
+/*
+ * after posting every 'ZEBRA_MLAG_POST_LIMIT' packets, MLAG Thread will be
+ * yielded to give CPU for other threads
+ */
+#define ZEBRA_MLAG_POST_LIMIT 100
+
+/*
+ * This thread reads the clients data from the Global queue and encodes with
+ * protobuf and pass on to the MLAG socket.
+ */
+static int zebra_mlag_client_msg_handler(struct thread *event)
+{
+ struct stream *s;
+ uint32_t wr_count = 0;
+ uint32_t msg_type = 0;
+ int len = 0;
+
+ wr_count = stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo);
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(":%s: Processing MLAG write, %d messages in queue",
+ __func__, wr_count);
+
+ zrouter.mlag_info.t_write = NULL;
+ for (wr_count = 0; wr_count < ZEBRA_MLAG_POST_LIMIT; wr_count++) {
+ /* FIFO is empty,wait for teh message to be add */
+ if (stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo) == 0)
+ break;
+
+ s = stream_fifo_pop_safe(zrouter.mlag_info.mlag_fifo);
+ if (!s) {
+ zlog_debug(":%s: Got a NULL Messages, some thing wrong",
+ __func__);
+ break;
+ }
+
+ zebra_mlag_reset_write_buffer();
+ /*
+ * Encode the data now
+ */
+ len = zebra_mlag_protobuf_encode_client_data(s, &msg_type);
+
+ /*
+ * write to MCLAGD
+ */
+ if (len > 0)
+ zebra_mlag_private_write_data(mlag_wr_buffer, len);
+
+ /*
+ * If message type is De-register, send a signal to main thread,
+ * so that necessary cleanup will be done by main thread.
+ */
+ if (msg_type == MLAG_DEREGISTER) {
+ thread_add_event(zrouter.master,
+ zebra_mlag_terminate_pthread, NULL, 0,
+ NULL);
+ }
+
+ stream_free(s);
+ }
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(":%s: Posted %d messages to MLAGD", __func__,
+ wr_count);
+ /*
+ * Currently there is only message write task is enqueued to this
+ * thread, yielding was added for future purpose, so that this thread
+ * can server other tasks also and in case FIFO is empty, this task will
+ * be schedule when main thread adds some messages
+ */
+ if (wr_count >= ZEBRA_MLAG_POST_LIMIT)
+ zebra_mlag_signal_write_thread();
+ return 0;
+}
+
+/*
+ * API to handle the process state.
+ * In case of Down, Zebra keep monitoring the MLAG state.
+ * all the state Notifications will be published to clients
+ */
+void zebra_mlag_handle_process_state(enum zebra_mlag_state state)
+{
+ if (state == MLAG_UP) {
+ zrouter.mlag_info.connected = true;
+ zebra_mlag_publish_process_state(NULL, ZEBRA_MLAG_PROCESS_UP);
+ zebra_mlag_send_register();
+ } else if (state == MLAG_DOWN) {
+ zrouter.mlag_info.connected = false;
+ zebra_mlag_publish_process_state(NULL, ZEBRA_MLAG_PROCESS_DOWN);
+ zebra_mlag_private_monitor_state();
+ }
+}
+
+/***********************End of MLAG Thread processing*************************/
+
+/*************************Multi-entratnt Api's********************************/
+
+/*
+ * Provider api to signal that work/events are available
+ * for the Zebra MLAG Write pthread.
+ * This API is called from 2 pthreads..
+ * 1) by main thread when client posts a MLAG Message
+ * 2) by MLAG Thread, in case of yield
+ * though this api, is called from two threads we don't need any locking
+ * because Thread task enqueue is thread safe means internally it had
+ * necessary protection
+ */
+static int zebra_mlag_signal_write_thread(void)
+{
+ if (zrouter.mlag_info.zebra_pth_mlag) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(":%s: Scheduling MLAG write", __func__);
+ thread_add_event(zrouter.mlag_info.th_master,
+ zebra_mlag_client_msg_handler, NULL, 0,
+ &zrouter.mlag_info.t_write);
+ }
+ return 0;
+}
+
+/*
+ * API will be used to publish the MLAG state to interested clients
+ * In case client is passed, state is posted only for that client,
+ * otherwise to all interested clients
+ * this api can be called from two threads.
+ * 1) from main thread: when client is passed
+ * 2) from MLAG Thread: when client is NULL
+ *
+ * In second case, to avoid global data access data will be post to Main
+ * thread, so that actual posting to clients will happen from Main thread.
+ */
+static void zebra_mlag_publish_process_state(struct zserv *client,
+ zebra_message_types_t msg_type)
+{
+ struct stream *s;
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: Publishing MLAG process state:%s to %s Client",
+ __func__,
+ (msg_type == ZEBRA_MLAG_PROCESS_UP) ? "UP" : "DOWN",
+ (client) ? "one" : "all");
+
+ if (client) {
+ s = stream_new(ZEBRA_HEADER_SIZE);
+ zclient_create_header(s, msg_type, VRF_DEFAULT);
+ zserv_send_message(client, s);
+ return;
+ }
+
+
+ /*
+ * additional four bytes are for mesasge type
+ */
+ s = stream_new(ZEBRA_HEADER_SIZE + ZEBRA_MLAG_METADATA_LEN);
+ stream_putl(s, ZEBRA_MLAG_MSG_BCAST);
+ zclient_create_header(s, msg_type, VRF_DEFAULT);
+ thread_add_event(zrouter.master, zebra_mlag_post_data_from_main_thread,
+ s, 0, NULL);
+}
+
+/**************************End of Multi-entrant Apis**************************/
+
+/***********************Zebra Main thread processing**************************/
+
+/*
+ * To avoid data corruption, messages will be post to clients only from
+ * main thread, because for that access was needed for clients list.
+ * so instead of forcing the locks, messages will be posted from main thread.
+ */
+static int zebra_mlag_post_data_from_main_thread(struct thread *thread)
+{
+ struct stream *s = THREAD_ARG(thread);
+ struct stream *zebra_s = NULL;
+ struct listnode *node;
+ struct zserv *client;
+ uint32_t msg_type = 0;
+ uint32_t msg_len = 0;
+
+ if (!s)
+ return -1;
+
+ STREAM_GETL(s, msg_type);
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "%s: Posting MLAG data for msg_type:0x%x to interested cleints",
+ __func__, msg_type);
+
+ msg_len = s->endp - ZEBRA_MLAG_METADATA_LEN;
+ for (ALL_LIST_ELEMENTS_RO(zrouter.client_list, node, client)) {
+ if (client->mlag_updates_interested == true) {
+ if (msg_type != ZEBRA_MLAG_MSG_BCAST
+ && !CHECK_FLAG(client->mlag_reg_mask1,
+ (1 << msg_type))) {
+ continue;
+ }
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "%s: Posting MLAG data of length-%d to client:%d ",
+ __func__, msg_len, client->proto);
+
+ zebra_s = stream_new(msg_len);
+ STREAM_GET(zebra_s->data, s, msg_len);
+ zebra_s->endp = msg_len;
+ stream_putw_at(zebra_s, 0, msg_len);
+
+ /*
+ * This stream will be enqueued to client_obuf, it will
+ * be freed after posting to client socket.
+ */
+ zserv_send_message(client, zebra_s);
+ zebra_s = NULL;
+ }
+ }
+
+ stream_free(s);
+ return 0;
+stream_failure:
+ stream_free(s);
+ if (zebra_s)
+ stream_free(zebra_s);
+ return 0;
+}
+
+/*
+ * Start the MLAG Thread, this will be used to write client data on to
+ * MLAG Process and to read the data from MLAG and post to cleints.
+ * when all clients are un-registered, this Thread will be
+ * suspended.
+ */
+static void zebra_mlag_spawn_pthread(void)
+{
+ /* Start MLAG write pthread */
+
+ struct frr_pthread_attr pattr = {.start =
+ frr_pthread_attr_default.start,
+ .stop = frr_pthread_attr_default.stop};
+
+ zrouter.mlag_info.zebra_pth_mlag =
+ frr_pthread_new(&pattr, "Zebra MLAG thread", "Zebra MLAG");
+
+ zrouter.mlag_info.th_master = zrouter.mlag_info.zebra_pth_mlag->master;
+
+
+ /* Enqueue an initial event for the dataplane pthread */
+ zebra_mlag_signal_write_thread();
+
+ frr_pthread_run(zrouter.mlag_info.zebra_pth_mlag, NULL);
+}
+
+/*
+ * all clients are un-registered for MLAG Updates, terminate the
+ * MLAG write thread
+ */
+static int zebra_mlag_terminate_pthread(struct thread *event)
+{
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Zebra MLAG write thread terminate called");
+
+ if (zrouter.mlag_info.clients_interested_cnt) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "Zebra MLAG: still some clients are interested");
+ return 0;
+ }
+
+ frr_pthread_stop(zrouter.mlag_info.zebra_pth_mlag, NULL);
+
+ /* Destroy pthread */
+ frr_pthread_destroy(zrouter.mlag_info.zebra_pth_mlag);
+ zrouter.mlag_info.zebra_pth_mlag = NULL;
+ zrouter.mlag_info.th_master = NULL;
+ zrouter.mlag_info.t_read = NULL;
+ zrouter.mlag_info.t_write = NULL;
+
+ /*
+ * Send Notification to clean private data
+ */
+ zebra_mlag_private_cleanup_data();
+ return 0;
+}
+
+/*
+ * API to register zebra client for MLAG Updates
+ */
+void zebra_mlag_client_register(ZAPI_HANDLER_ARGS)
+{
+ struct stream *s;
+ uint32_t reg_mask = 0;
+ int rc = 0;
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Received MLAG Registration from client-proto:%d",
+ client->proto);
+
+
+ /* Get input stream. */
+ s = msg;
+
+ /* Get data. */
+ STREAM_GETL(s, reg_mask);
+
+ if (client->mlag_updates_interested == true) {
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "Client is registered, existing mask: 0x%x, new mask: 0x%x",
+ client->mlag_reg_mask1, reg_mask);
+ if (client->mlag_reg_mask1 != reg_mask)
+ client->mlag_reg_mask1 = reg_mask;
+ /*
+ * Client might missed MLAG-UP Notification, post-it again
+ */
+ zebra_mlag_publish_process_state(client, ZEBRA_MLAG_PROCESS_UP);
+ return;
+ }
+
+
+ client->mlag_updates_interested = true;
+ client->mlag_reg_mask1 = reg_mask;
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Registering for MLAG Updates with mask: 0x%x, ",
+ client->mlag_reg_mask1);
+
+ zrouter.mlag_info.clients_interested_cnt++;
+
+ if (zrouter.mlag_info.clients_interested_cnt == 1) {
+ /*
+ * First-client for MLAG Updates,open the communication channel
+ * with MLAG
+ */
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "First client, opening the channel with MLAG");
+
+ zebra_mlag_spawn_pthread();
+ rc = zebra_mlag_private_open_channel();
+ if (rc < 0) {
+ /*
+ * For some reason, zebra not able to open the
+ * comm-channel with MLAG, so post MLAG-DOWN to client.
+ * later when the channel is open, zebra will send
+ * MLAG-UP
+ */
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "Fail to open channel with MLAG,rc:%d, post Proto-down",
+ rc);
+ zebra_mlag_publish_process_state(
+ client, ZEBRA_MLAG_PROCESS_DOWN);
+ }
+ }
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Client Registered successfully for MLAG Updates");
+
+ if (zrouter.mlag_info.connected == true)
+ zebra_mlag_publish_process_state(client, ZEBRA_MLAG_PROCESS_UP);
+stream_failure:
+ return;
+}
+
+/*
+ * API to un-register for MLAG Updates
+ */
+void zebra_mlag_client_unregister(ZAPI_HANDLER_ARGS)
+{
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Received MLAG De-Registration from client-proto:%d",
+ client->proto);
+
+ if (client->mlag_updates_interested == false)
+ /* Unexpected */
+ return;
+
+ client->mlag_updates_interested = false;
+ client->mlag_reg_mask1 = 0;
+ zrouter.mlag_info.clients_interested_cnt--;
+
+ if (zrouter.mlag_info.clients_interested_cnt == 0) {
+ /*
+ * No-client is interested for MLAG Updates,close the
+ * communication channel with MLAG
+ */
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Last client for MLAG, close the channel ");
+
+ /*
+ * Clean up flow:
+ * =============
+ * 1) main thread calls socket close which posts De-register
+ * to MLAG write thread
+ * 2) after MLAG write thread posts De-register it sends a
+ * signal back to main thread to do the thread cleanup
+ * this was mainly to make sure De-register is posted to MCLAGD.
+ */
+ zebra_mlag_private_close_channel();
+ }
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "Client De-Registered successfully for MLAG Updates");
+}
+
+/*
+ * Does following things.
+ * 1) allocated new local stream, and copies the client data and enqueue
+ * to MLAG Thread
+ * 2) MLAG Thread after dequeing, encode the client data using protobuf
+ * and write on to MLAG
+ */
+void zebra_mlag_forward_client_msg(ZAPI_HANDLER_ARGS)
+{
+ struct stream *zebra_s;
+ struct stream *mlag_s;
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("Received Client MLAG Data from client-proto:%d",
+ client->proto);
+
+ /* Get input stream. */
+ zebra_s = msg;
+ mlag_s = stream_new(zebra_s->endp);
+
+ /*
+ * Client data is | Zebra Header + MLAG Data |
+ * we need to enqueue only the MLAG data, skipping Zebra Header
+ */
+ stream_put(mlag_s, zebra_s->data + zebra_s->getp,
+ STREAM_READABLE(zebra_s));
+ stream_fifo_push_safe(zrouter.mlag_info.mlag_fifo, mlag_s);
+ zebra_mlag_signal_write_thread();
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: Enqueued Client:%d data to MLAG Thread ",
+ __func__, client->proto);
+}
+
+/***********************End of Zebra Main thread processing*************/
+
enum mlag_role zebra_mlag_get_role(void)
{
return zrouter.mlag_info.role;
return CMD_SUCCESS;
}
-DEFPY_HIDDEN (test_mlag,
- test_mlag_cmd,
- "test zebra mlag <none$none|primary$primary|secondary$secondary>",
- "Test code\n"
- ZEBRA_STR
- "Modify the Mlag state\n"
- "Mlag is not setup on the machine\n"
- "Mlag is setup to be primary\n"
- "Mlag is setup to be the secondary\n")
+DEFPY(test_mlag, test_mlag_cmd,
+ "test zebra mlag <none$none|primary$primary|secondary$secondary>",
+ "Test code\n" ZEBRA_STR
+ "Modify the Mlag state\n"
+ "Mlag is not setup on the machine\n"
+ "Mlag is setup to be primary\n"
+ "Mlag is setup to be the secondary\n")
{
enum mlag_role orig = zrouter.mlag_info.role;
char buf1[80], buf2[80];
mlag_role2str(orig, buf1, sizeof(buf1)),
mlag_role2str(orig, buf2, sizeof(buf2)));
- if (orig != zrouter.mlag_info.role)
+ if (orig != zrouter.mlag_info.role) {
zsend_capabilities_all_clients();
+ if (zrouter.mlag_info.role != MLAG_ROLE_NONE) {
+ if (zrouter.mlag_info.clients_interested_cnt == 0
+ && test_mlag_in_progress == false) {
+ if (zrouter.mlag_info.zebra_pth_mlag == NULL)
+ zebra_mlag_spawn_pthread();
+ zrouter.mlag_info.clients_interested_cnt++;
+ test_mlag_in_progress = true;
+ zebra_mlag_private_open_channel();
+ }
+ } else {
+ if (test_mlag_in_progress == true) {
+ test_mlag_in_progress = false;
+ zrouter.mlag_info.clients_interested_cnt--;
+ zebra_mlag_private_close_channel();
+ }
+ }
+ }
return CMD_SUCCESS;
}
{
install_element(VIEW_NODE, &show_mlag_cmd);
install_element(ENABLE_NODE, &test_mlag_cmd);
+
+ /*
+ * Intialiaze teh MLAG Global variableis
+ * write thread will be craeted during actual registration with MCLAG
+ */
+ zrouter.mlag_info.clients_interested_cnt = 0;
+ zrouter.mlag_info.connected = false;
+ zrouter.mlag_info.timer_running = false;
+ zrouter.mlag_info.mlag_fifo = stream_fifo_new();
+ zrouter.mlag_info.zebra_pth_mlag = NULL;
+ zrouter.mlag_info.th_master = NULL;
+ zrouter.mlag_info.t_read = NULL;
+ zrouter.mlag_info.t_write = NULL;
+ test_mlag_in_progress = false;
+ zebra_mlag_reset_write_buffer();
+ zebra_mlag_reset_read_buffer();
}
void zebra_mlag_terminate(void)
{
}
+
+
+/*
+ *
+ * ProtoBuf Encoding APIs
+ */
+
+int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
+{
+ return 0;
+}
+
+int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data,
+ uint32_t len)
+{
+ return 0;
+}
--- /dev/null
+/*
+ * This is an implementation of MLAG Functionality
+ *
+ * Module name: Zebra MLAG
+ *
+ * Author: sathesh Kumar karra <sathk@cumulusnetworks.com>
+ *
+ * Copyright (C) 2019 Cumulus Networks http://www.cumulusnetworks.com
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 2 of the License, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+#include "zebra.h"
+
+#include "hook.h"
+#include "module.h"
+#include "thread.h"
+#include "libfrr.h"
+#include "version.h"
+#include "network.h"
+
+#include "lib/stream.h"
+
+#include "zebra/debug.h"
+#include "zebra/zebra_router.h"
+#include "zebra/zebra_mlag.h"
+#include "zebra/zebra_mlag_private.h"
+
+#include <sys/un.h>
+
+
+/*
+ * This file will have platform specific apis to communicate with MCLAG.
+ *
+ */
+
+#ifdef HAVE_CUMULUS
+
+static struct thread_master *zmlag_master;
+static int mlag_socket;
+
+static int zebra_mlag_connect(struct thread *thread);
+static int zebra_mlag_read(struct thread *thread);
+
+/*
+ * Write the data to MLAGD
+ */
+int zebra_mlag_private_write_data(uint8_t *data, uint32_t len)
+{
+ int rc = 0;
+
+ if (IS_ZEBRA_DEBUG_MLAG) {
+ zlog_debug("%s: Writing %d length Data to clag", __func__, len);
+ zlog_hexdump(data, len);
+ }
+ rc = write(mlag_socket, data, len);
+ return rc;
+}
+
+static void zebra_mlag_sched_read(void)
+{
+ thread_add_read(zmlag_master, zebra_mlag_read, NULL, mlag_socket,
+ &zrouter.mlag_info.t_read);
+}
+
+static int zebra_mlag_read(struct thread *thread)
+{
+ uint32_t *msglen;
+ uint32_t h_msglen;
+ uint32_t tot_len, curr_len = mlag_rd_buf_offset;
+
+ zrouter.mlag_info.t_read = NULL;
+
+ /*
+ * Received message in sock_stream looks like below
+ * | len-1 (4 Bytes) | payload-1 (len-1) |
+ * len-2 (4 Bytes) | payload-2 (len-2) | ..
+ *
+ * Idea is read one message completely, then process, until message is
+ * read completely, keep on reading from the socket
+ */
+ if (curr_len < ZEBRA_MLAG_LEN_SIZE) {
+ ssize_t data_len;
+
+ data_len = read(mlag_socket, mlag_rd_buffer + curr_len,
+ ZEBRA_MLAG_LEN_SIZE - curr_len);
+ if (data_len == 0 || data_len == -1) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("MLAG connection closed socket : %d",
+ mlag_socket);
+ close(mlag_socket);
+ zebra_mlag_handle_process_state(MLAG_DOWN);
+ return -1;
+ }
+ if (data_len != (ssize_t)ZEBRA_MLAG_LEN_SIZE - curr_len) {
+ /* Try again later */
+ zebra_mlag_sched_read();
+ return 0;
+ }
+ curr_len = ZEBRA_MLAG_LEN_SIZE;
+ }
+
+ /* Get the actual packet length */
+ msglen = (uint32_t *)mlag_rd_buffer;
+ h_msglen = ntohl(*msglen);
+
+ /* This will be the actual length of the packet */
+ tot_len = h_msglen + ZEBRA_MLAG_LEN_SIZE;
+
+ if (curr_len < tot_len) {
+ ssize_t data_len;
+
+ data_len = read(mlag_socket, mlag_rd_buffer + curr_len,
+ tot_len - curr_len);
+ if (data_len == 0 || data_len == -1) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("MLAG connection closed socket : %d",
+ mlag_socket);
+ close(mlag_socket);
+ zebra_mlag_handle_process_state(MLAG_DOWN);
+ return -1;
+ }
+ if (data_len != (ssize_t)tot_len - curr_len) {
+ /* Try again later */
+ zebra_mlag_sched_read();
+ return 0;
+ }
+ }
+
+ if (IS_ZEBRA_DEBUG_MLAG) {
+ zlog_debug("Received a MLAG Message from socket: %d, len:%u ",
+ mlag_socket, tot_len);
+ zlog_hexdump(mlag_rd_buffer, tot_len);
+ }
+
+ tot_len -= ZEBRA_MLAG_LEN_SIZE;
+
+ /* Process the packet */
+ zebra_mlag_process_mlag_data(mlag_rd_buffer + ZEBRA_MLAG_LEN_SIZE,
+ tot_len);
+
+ /* Register read thread. */
+ zebra_mlag_reset_read_buffer();
+ zebra_mlag_sched_read();
+ return 0;
+}
+
+static int zebra_mlag_connect(struct thread *thread)
+{
+ struct sockaddr_un svr = {0};
+ struct ucred ucred;
+ socklen_t len = 0;
+
+ /* Reset the Timer-running flag */
+ zrouter.mlag_info.timer_running = false;
+
+ /* Reset, sothat Next task can be scheduled */
+ zrouter.mlag_info.t_read = NULL;
+ svr.sun_family = AF_UNIX;
+#define MLAG_SOCK_NAME "/var/run/clag-zebra.socket"
+ strlcpy(svr.sun_path, MLAG_SOCK_NAME, sizeof(MLAG_SOCK_NAME) + 1);
+
+ mlag_socket = socket(svr.sun_family, SOCK_STREAM, 0);
+ if (mlag_socket < 0)
+ return -1;
+
+ if (connect(mlag_socket, (struct sockaddr *)&svr, sizeof(svr)) == -1) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "Unable to connect to %s try again in 10 secs",
+ svr.sun_path);
+ close(mlag_socket);
+ zrouter.mlag_info.timer_running = true;
+ thread_add_timer(zmlag_master, zebra_mlag_connect, NULL, 10,
+ &zrouter.mlag_info.t_read);
+ return 0;
+ }
+ len = sizeof(struct ucred);
+ ucred.pid = getpid();
+
+ set_nonblocking(mlag_socket);
+ setsockopt(mlag_socket, SOL_SOCKET, SO_PEERCRED, &ucred, len);
+
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: Connection with MLAG is established ",
+ __func__);
+
+ thread_add_read(zmlag_master, zebra_mlag_read, NULL, mlag_socket,
+ &zrouter.mlag_info.t_read);
+ /*
+ * Connection is established with MLAGD, post to clients
+ */
+ zebra_mlag_handle_process_state(MLAG_UP);
+ return 0;
+}
+
+/*
+ * Currently we are doing polling later we will look for better options
+ */
+void zebra_mlag_private_monitor_state(void)
+{
+ thread_add_event(zmlag_master, zebra_mlag_connect, NULL, 0,
+ &zrouter.mlag_info.t_read);
+}
+
+int zebra_mlag_private_open_channel(void)
+{
+ zmlag_master = zrouter.mlag_info.th_master;
+
+ if (zrouter.mlag_info.connected == true) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: Zebra already connected to MLAGD",
+ __func__);
+ return 0;
+ }
+
+ if (zrouter.mlag_info.timer_running == true) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug(
+ "%s: Connection retry is in progress for MLAGD",
+ __func__);
+ return 0;
+ }
+
+ if (zrouter.mlag_info.clients_interested_cnt) {
+ /*
+ * Connect only if any clients are showing interest
+ */
+ thread_add_event(zmlag_master, zebra_mlag_connect, NULL, 0,
+ &zrouter.mlag_info.t_read);
+ }
+ return 0;
+}
+
+int zebra_mlag_private_close_channel(void)
+{
+ if (zmlag_master == NULL)
+ return -1;
+
+ if (zrouter.mlag_info.clients_interested_cnt) {
+ if (IS_ZEBRA_DEBUG_MLAG)
+ zlog_debug("%s: still %d clients are connected, skip",
+ __func__,
+ zrouter.mlag_info.clients_interested_cnt);
+ return -1;
+ }
+
+ /*
+ * Post the De-register to MLAG, so that it can do necesasry cleanup
+ */
+ zebra_mlag_send_deregister();
+
+ return 0;
+}
+
+void zebra_mlag_private_cleanup_data(void)
+{
+ zmlag_master = NULL;
+ zrouter.mlag_info.connected = false;
+ zrouter.mlag_info.timer_running = false;
+
+ close(mlag_socket);
+}
+
+#else /*HAVE_CUMULUS */
+
+int zebra_mlag_private_write_data(uint8_t *data, uint32_t len)
+{
+ return 0;
+}
+
+void zebra_mlag_private_monitor_state(void)
+{
+}
+
+int zebra_mlag_private_open_channel(void)
+{
+ return 0;
+}
+
+int zebra_mlag_private_close_channel(void)
+{
+ return 0;
+}
+
+void zebra_mlag_private_cleanup_data(void)
+{
+}
+#endif /*HAVE_CUMULUS*/