diff options
| author | Renato Westphal <renato@opensourcerouting.org> | 2019-01-25 18:54:16 -0200 | 
|---|---|---|
| committer | Renato Westphal <renato@opensourcerouting.org> | 2019-04-26 18:15:32 -0300 | 
| commit | ec2ac5f28a83c39b2df02279482494129ddaea28 (patch) | |
| tree | a3a52ab9d5e4d47c810b275b54c61dbdb7ece835 /lib/northbound_grpc.cpp | |
| parent | 83981138fe8c1e0a40b8dede74eca65449dda5de (diff) | |
lib: add new gRPC-based northbound plugin
This is an experimental plugin for now. Full documentation will
come later.
Signed-off-by: Renato Westphal <renato@opensourcerouting.org>
Diffstat (limited to 'lib/northbound_grpc.cpp')
| -rw-r--r-- | lib/northbound_grpc.cpp | 936 | 
1 files changed, 936 insertions, 0 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp new file mode 100644 index 0000000000..a55da23dd1 --- /dev/null +++ b/lib/northbound_grpc.cpp @@ -0,0 +1,936 @@ +// +// Copyright (C) 2019  NetDEF, Inc. +//                     Renato Westphal +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation; either version 2 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; see the file COPYING; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +// + +#include <zebra.h> + +#include "log.h" +#include "libfrr.h" +#include "version.h" +#include "command.h" +#include "lib_errors.h" +#include "northbound.h" +#include "northbound_db.h" + +#include <iostream> +#include <sstream> +#include <memory> +#include <string> + +#include <grpcpp/grpcpp.h> +#include "grpc/frr-northbound.grpc.pb.h" + +#define GRPC_DEFAULT_PORT 50051 + +/* + * NOTE: we can't use the FRR debugging infrastructure here since it uses + * atomics and C++ has a different atomics API. Enable gRPC debugging + * unconditionally until we figure out a way to solve this problem. + */ +static bool nb_dbg_client_grpc = 1; + +static pthread_t grpc_pthread; + +class NorthboundImpl final : public frr::Northbound::Service +{ +      public: +	NorthboundImpl(void) +	{ +		_nextCandidateId = 0; +	} + +	~NorthboundImpl(void) +	{ +		// Delete candidates. +		for (auto it = _candidates.begin(); it != _candidates.end(); +		     it++) +			delete_candidate(&it->second); +	} + +	grpc::Status +	GetCapabilities(grpc::ServerContext *context, +			frr::GetCapabilitiesRequest const *request, +			frr::GetCapabilitiesResponse *response) override +	{ +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC GetCapabilities()"); + +		// Response: string frr_version = 1; +		response->set_frr_version(FRR_VERSION); + +		// Response: bool rollback_support = 2; +#ifdef HAVE_CONFIG_ROLLBACKS +		response->set_rollback_support(true); +#else +		response->set_rollback_support(false); +#endif + +		// Response: repeated ModuleData supported_modules = 3; +		struct yang_module *module; +		RB_FOREACH (module, yang_modules, &yang_modules) { +			auto m = response->add_supported_modules(); + +			m->set_name(module->name); +			if (module->info->rev_size) +				m->set_revision(module->info->rev[0].date); +			m->set_organization(module->info->org); +		} + +		// Response: repeated Encoding supported_encodings = 4; +		response->add_supported_encodings(frr::JSON); +		response->add_supported_encodings(frr::XML); + +		return grpc::Status::OK; +	} + +	grpc::Status Get(grpc::ServerContext *context, +			 frr::GetRequest const *request, +			 grpc::ServerWriter<frr::GetResponse> *writer) override +	{ +		// Request: DataType type = 1; +		int type = request->type(); +		// Request: Encoding encoding = 2; +		frr::Encoding encoding = request->encoding(); +		// Request: bool with_defaults = 3; +		bool with_defaults = request->with_defaults(); + +		if (nb_dbg_client_grpc) +			zlog_debug( +				"received RPC Get(type: %u, encoding: %u, with_defaults: %u)", +				type, encoding, with_defaults); + +		// Request: repeated string path = 4; +		auto paths = request->path(); +		for (const std::string &path : paths) { +			frr::GetResponse response; +			grpc::Status status; + +			// Response: int64 timestamp = 1; +			response.set_timestamp(time(NULL)); + +			// Response: DataTree data = 2; +			auto *data = response.mutable_data(); +			data->set_encoding(request->encoding()); +			status = get_path(data, path, type, +					  encoding2lyd_format(encoding), +					  with_defaults); + +			// Something went wrong... +			if (!status.ok()) +				return status; + +			writer->Write(response); +		} + +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC Get() end"); + +		return grpc::Status::OK; +	} + +	grpc::Status +	CreateCandidate(grpc::ServerContext *context, +			frr::CreateCandidateRequest const *request, +			frr::CreateCandidateResponse *response) override +	{ +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC CreateCandidate()"); + +		struct candidate *candidate = create_candidate(); +		if (!candidate) +			return grpc::Status( +				grpc::StatusCode::RESOURCE_EXHAUSTED, +				"Can't create candidate configuration"); + +		// Response: uint32 candidate_id = 1; +		response->set_candidate_id(candidate->id); + +		return grpc::Status::OK; +	} + +	grpc::Status +	DeleteCandidate(grpc::ServerContext *context, +			frr::DeleteCandidateRequest const *request, +			frr::DeleteCandidateResponse *response) override +	{ +		// Request: uint32 candidate_id = 1; +		uint32_t candidate_id = request->candidate_id(); + +		if (nb_dbg_client_grpc) +			zlog_debug( +				"received RPC DeleteCandidate(candidate_id: %u)", +				candidate_id); + +		struct candidate *candidate = get_candidate(candidate_id); +		if (!candidate) +			return grpc::Status( +				grpc::StatusCode::NOT_FOUND, +				"candidate configuration not found"); + +		delete_candidate(candidate); + +		return grpc::Status::OK; +	} + +	grpc::Status +	UpdateCandidate(grpc::ServerContext *context, +			frr::UpdateCandidateRequest const *request, +			frr::UpdateCandidateResponse *response) override +	{ +		// Request: uint32 candidate_id = 1; +		uint32_t candidate_id = request->candidate_id(); + +		if (nb_dbg_client_grpc) +			zlog_debug( +				"received RPC UpdateCandidate(candidate_id: %u)", +				candidate_id); + +		struct candidate *candidate = get_candidate(candidate_id); +		if (!candidate) +			return grpc::Status( +				grpc::StatusCode::NOT_FOUND, +				"candidate configuration not found"); + +		if (candidate->transaction) +			return grpc::Status( +				grpc::StatusCode::FAILED_PRECONDITION, +				"candidate is in the middle of a transaction"); + +		if (nb_candidate_update(candidate->config) != NB_OK) +			return grpc::Status( +				grpc::StatusCode::INTERNAL, +				"failed to update candidate configuration"); + +		return grpc::Status::OK; +	} + +	grpc::Status +	EditCandidate(grpc::ServerContext *context, +		      frr::EditCandidateRequest const *request, +		      frr::EditCandidateResponse *response) override +	{ +		// Request: uint32 candidate_id = 1; +		uint32_t candidate_id = request->candidate_id(); + +		if (nb_dbg_client_grpc) +			zlog_debug( +				"received RPC EditCandidate(candidate_id: %u)", +				candidate_id); + +		struct candidate *candidate = get_candidate(candidate_id); +		if (!candidate) +			return grpc::Status( +				grpc::StatusCode::NOT_FOUND, +				"candidate configuration not found"); + +		// Create a copy of the candidate. For consistency, we need to +		// ensure that either all changes are accepted or none are (in +		// the event of an error). +		struct nb_config *candidate_tmp = +			nb_config_dup(candidate->config); + +		auto pvs = request->update(); +		for (const frr::PathValue &pv : pvs) { +			if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), +					    pv.value()) +			    != 0) { +				nb_config_free(candidate_tmp); +				return grpc::Status( +					grpc::StatusCode::INVALID_ARGUMENT, +					"Failed to update \"" + pv.path() +						+ "\""); +			} +		} + +		pvs = request->delete_(); +		for (const frr::PathValue &pv : pvs) { +			if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) +			    != 0) { +				nb_config_free(candidate_tmp); +				return grpc::Status( +					grpc::StatusCode::INVALID_ARGUMENT, +					"Failed to remove \"" + pv.path() +						+ "\""); +			} +		} + +		// No errors, accept all changes. +		nb_config_replace(candidate->config, candidate_tmp, false); + +		return grpc::Status::OK; +	} + +	grpc::Status +	LoadToCandidate(grpc::ServerContext *context, +			frr::LoadToCandidateRequest const *request, +			frr::LoadToCandidateResponse *response) override +	{ +		// Request: uint32 candidate_id = 1; +		uint32_t candidate_id = request->candidate_id(); +		// Request: LoadType type = 2; +		int load_type = request->type(); +		// Request: DataTree config = 3; +		auto config = request->config(); + +		if (nb_dbg_client_grpc) +			zlog_debug( +				"received RPC LoadToCandidate(candidate_id: %u)", +				candidate_id); + +		struct candidate *candidate = get_candidate(candidate_id); +		if (!candidate) +			return grpc::Status( +				grpc::StatusCode::NOT_FOUND, +				"candidate configuration not found"); + +		struct lyd_node *dnode = dnode_from_data_tree(&config, true); +		if (!dnode) +			return grpc::Status( +				grpc::StatusCode::INTERNAL, +				"Failed to parse the configuration"); + +		struct nb_config *loaded_config = nb_config_new(dnode); + +		if (load_type == frr::LoadToCandidateRequest::REPLACE) +			nb_config_replace(candidate->config, loaded_config, +					  false); +		else if (nb_config_merge(candidate->config, loaded_config, +					 false) +			 != NB_OK) +			return grpc::Status( +				grpc::StatusCode::INTERNAL, +				"Failed to merge the loaded configuration"); + +		return grpc::Status::OK; +	} + +	grpc::Status Commit(grpc::ServerContext *context, +			    frr::CommitRequest const *request, +			    frr::CommitResponse *response) override +	{ +		// Request: uint32 candidate_id = 1; +		uint32_t candidate_id = request->candidate_id(); +		// Request: Phase phase = 2; +		int phase = request->phase(); +		// Request: string comment = 3; +		const std::string comment = request->comment(); + +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC Commit(candidate_id: %u)", +				   candidate_id); + +		// Find candidate configuration. +		struct candidate *candidate = get_candidate(candidate_id); +		if (!candidate) +			return grpc::Status( +				grpc::StatusCode::NOT_FOUND, +				"candidate configuration not found"); + +		int ret = NB_OK; +		uint32_t transaction_id = 0; + +		// Check for misuse of the two-phase commit protocol. +		switch (phase) { +		case frr::CommitRequest::PREPARE: +		case frr::CommitRequest::ALL: +			if (candidate->transaction) +				return grpc::Status( +					grpc::StatusCode::FAILED_PRECONDITION, +					"pending transaction in progress"); +			break; +		case frr::CommitRequest::ABORT: +		case frr::CommitRequest::APPLY: +			if (!candidate->transaction) +				return grpc::Status( +					grpc::StatusCode::FAILED_PRECONDITION, +					"no transaction in progress"); +			break; +		default: +			break; +		} + +		// Execute the user request. +		switch (phase) { +		case frr::CommitRequest::VALIDATE: +			ret = nb_candidate_validate(candidate->config); +			break; +		case frr::CommitRequest::PREPARE: +			ret = nb_candidate_commit_prepare( +				candidate->config, NB_CLIENT_GRPC, NULL, +				comment.c_str(), &candidate->transaction); +			break; +		case frr::CommitRequest::ABORT: +			nb_candidate_commit_abort(candidate->transaction); +			break; +		case frr::CommitRequest::APPLY: +			nb_candidate_commit_apply(candidate->transaction, true, +						  &transaction_id); +			break; +		case frr::CommitRequest::ALL: +			ret = nb_candidate_commit( +				candidate->config, NB_CLIENT_GRPC, NULL, true, +				comment.c_str(), &transaction_id); +			break; +		} + +		// Map northbound error codes to gRPC error codes. +		switch (ret) { +		case NB_ERR_NO_CHANGES: +			return grpc::Status( +				grpc::StatusCode::ABORTED, +				"No configuration changes detected"); +		case NB_ERR_LOCKED: +			return grpc::Status( +				grpc::StatusCode::UNAVAILABLE, +				"There's already a transaction in progress"); +		case NB_ERR_VALIDATION: +			return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, +					    "Validation error"); +		case NB_ERR_RESOURCE: +			return grpc::Status( +				grpc::StatusCode::RESOURCE_EXHAUSTED, +				"Failed do allocate resources"); +		case NB_ERR: +			return grpc::Status(grpc::StatusCode::INTERNAL, +					    "Internal error"); +		default: +			break; +		} + +		// Response: uint32 transaction_id = 1; +		if (transaction_id) +			response->set_transaction_id(transaction_id); + +		return grpc::Status::OK; +	} + +	grpc::Status +	ListTransactions(grpc::ServerContext *context, +			 frr::ListTransactionsRequest const *request, +			 grpc::ServerWriter<frr::ListTransactionsResponse> +				 *writer) override +	{ +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC ListTransactions()"); + +		nb_db_transactions_iterate(list_transactions_cb, writer); + +		return grpc::Status::OK; +	} + +	grpc::Status +	GetTransaction(grpc::ServerContext *context, +		       frr::GetTransactionRequest const *request, +		       frr::GetTransactionResponse *response) override +	{ +		struct nb_config *nb_config; + +		// Request: uint32 transaction_id = 1; +		uint32_t transaction_id = request->transaction_id(); +		// Request: Encoding encoding = 2; +		frr::Encoding encoding = request->encoding(); +		// Request: bool with_defaults = 3; +		bool with_defaults = request->with_defaults(); + +		if (nb_dbg_client_grpc) +			zlog_debug( +				"received RPC GetTransaction(transaction_id: %u, encoding: %u)", +				transaction_id, encoding); + +		// Load configuration from the transactions database. +		nb_config = nb_db_transaction_load(transaction_id); +		if (!nb_config) +			return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, +					    "Transaction not found"); + +		// Response: DataTree config = 1; +		auto config = response->mutable_config(); +		config->set_encoding(encoding); + +		// Dump data using the requested format. +		if (data_tree_from_dnode(config, nb_config->dnode, +					 encoding2lyd_format(encoding), +					 with_defaults) +		    != 0) { +			nb_config_free(nb_config); +			return grpc::Status(grpc::StatusCode::INTERNAL, +					    "Failed to dump data"); +		} + +		nb_config_free(nb_config); + +		return grpc::Status::OK; +	} + +	grpc::Status LockConfig(grpc::ServerContext *context, +				frr::LockConfigRequest const *request, +				frr::LockConfigResponse *response) override +	{ +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC LockConfig()"); + +		if (nb_running_lock(NB_CLIENT_GRPC, NULL)) +			return grpc::Status( +				grpc::StatusCode::FAILED_PRECONDITION, +				"running configuration is locked already"); + +		return grpc::Status::OK; +	} + +	grpc::Status UnlockConfig(grpc::ServerContext *context, +				  frr::UnlockConfigRequest const *request, +				  frr::UnlockConfigResponse *response) override +	{ +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC UnlockConfig()"); + +		if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) +			return grpc::Status( +				grpc::StatusCode::FAILED_PRECONDITION, +				"failed to unlock the running configuration"); + +		return grpc::Status::OK; +	} + +	grpc::Status Execute(grpc::ServerContext *context, +			     frr::ExecuteRequest const *request, +			     frr::ExecuteResponse *response) override +	{ +		struct nb_node *nb_node; +		struct list *input_list; +		struct list *output_list; +		struct listnode *node; +		struct yang_data *data; +		const char *xpath; + +		// Request: string path = 1; +		xpath = request->path().c_str(); + +		if (nb_dbg_client_grpc) +			zlog_debug("received RPC Execute(path: \"%s\")", xpath); + +		if (request->path().empty()) +			return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, +					    "Data path is empty"); + +		nb_node = nb_node_find(xpath); +		if (!nb_node) +			return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, +					    "Unknown data path"); + +		input_list = yang_data_list_new(); +		output_list = yang_data_list_new(); + +		// Read input parameters. +		auto input = request->input(); +		for (const frr::PathValue &pv : input) { +			// Request: repeated PathValue input = 2; +			data = yang_data_new(pv.path().c_str(), +					     pv.value().c_str()); +			listnode_add(input_list, data); +		} + +		// Execute callback registered for this XPath. +		if (nb_node->cbs.rpc(xpath, input_list, output_list) != NB_OK) { +			flog_warn(EC_LIB_NB_CB_RPC, +				  "%s: rpc callback failed: %s", __func__, +				  xpath); +			list_delete(&input_list); +			list_delete(&output_list); +			return grpc::Status(grpc::StatusCode::INTERNAL, +					    "RPC failed"); +		} + +		// Process output parameters. +		for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { +			// Response: repeated PathValue output = 1; +			frr::PathValue *pv = response->add_output(); +			pv->set_path(data->xpath); +			pv->set_value(data->value); +		} + +		// Release memory. +		list_delete(&input_list); +		list_delete(&output_list); + +		return grpc::Status::OK; +	} + +      private: +	struct candidate { +		uint32_t id; +		struct nb_config *config; +		struct nb_transaction *transaction; +	}; +	std::map<uint32_t, struct candidate> _candidates; +	uint32_t _nextCandidateId; + +	static int yang_dnode_edit(struct lyd_node *dnode, +				   const std::string &path, +				   const std::string &value) +	{ +		ly_errno = LY_SUCCESS; +		dnode = lyd_new_path(dnode, ly_native_ctx, path.c_str(), +				     (void *)value.c_str(), +				     (LYD_ANYDATA_VALUETYPE)0, +				     LYD_PATH_OPT_UPDATE); +		if (!dnode && ly_errno != LY_SUCCESS) { +			flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed", +				  __func__); +			return -1; +		} + +		return 0; +	} + +	static int yang_dnode_delete(struct lyd_node *dnode, +				     const std::string &path) +	{ +		dnode = yang_dnode_get(dnode, path.c_str()); +		if (!dnode) +			return -1; + +		lyd_free(dnode); + +		return 0; +	} + +	static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) +	{ +		switch (encoding) { +		case frr::JSON: +			return LYD_JSON; +		case frr::XML: +			return LYD_XML; +		} +	} + +	static int get_oper_data_cb(const struct lys_node *snode, +				    struct yang_translator *translator, +				    struct yang_data *data, void *arg) +	{ +		struct lyd_node *dnode = static_cast<struct lyd_node *>(arg); +		int ret = yang_dnode_edit(dnode, data->xpath, data->value); +		yang_data_free(data); + +		return (ret == 0) ? NB_OK : NB_ERR; +	} + +	static void list_transactions_cb(void *arg, int transaction_id, +					 const char *client_name, +					 const char *date, const char *comment) +	{ +		grpc::ServerWriter<frr::ListTransactionsResponse> *writer = +			static_cast<grpc::ServerWriter< +				frr::ListTransactionsResponse> *>(arg); +		frr::ListTransactionsResponse response; + +		// Response: uint32 id = 1; +		response.set_id(transaction_id); + +		// Response: string client = 2; +		response.set_client(client_name); + +		// Response: string date = 3; +		response.set_date(date); + +		// Response: string comment = 4; +		response.set_comment(comment); + +		writer->Write(response); +	} + +	static int data_tree_from_dnode(frr::DataTree *dt, +					const struct lyd_node *dnode, +					LYD_FORMAT lyd_format, +					bool with_defaults) +	{ +		char *strp; +		int options = 0; + +		SET_FLAG(options, LYP_FORMAT | LYP_WITHSIBLINGS); +		if (with_defaults) +			SET_FLAG(options, LYP_WD_ALL); +		else +			SET_FLAG(options, LYP_WD_TRIM); + +		if (lyd_print_mem(&strp, dnode, lyd_format, options) == 0) { +			if (strp) { +				dt->set_data(strp); +				free(strp); +			} +			return 0; +		} + +		return -1; +	} + +	static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt, +						     bool config_only) +	{ +		struct lyd_node *dnode; +		int options; + +		if (config_only) +			options = LYD_OPT_CONFIG; +		else +			options = LYD_OPT_DATA | LYD_OPT_DATA_NO_YANGLIB; + +		dnode = lyd_parse_mem(ly_native_ctx, dt->data().c_str(), +				      encoding2lyd_format(dt->encoding()), +				      options); + +		return dnode; +	} + +	static struct lyd_node *get_dnode_config(const std::string &path) +	{ +		struct lyd_node *dnode; + +		pthread_rwlock_rdlock(&running_config->lock); +		{ +			dnode = yang_dnode_get(running_config->dnode, +					       path.empty() ? NULL +							    : path.c_str()); +			if (dnode) +				dnode = yang_dnode_dup(dnode); +		} +		pthread_rwlock_unlock(&running_config->lock); + +		return dnode; +	} + +	static struct lyd_node *get_dnode_state(const std::string &path) +	{ +		struct lyd_node *dnode; + +		dnode = yang_dnode_new(ly_native_ctx, false); +		if (nb_oper_data_iterate(path.c_str(), NULL, 0, +					 get_oper_data_cb, dnode) +		    != NB_OK) { +			yang_dnode_free(dnode); +			return NULL; +		} + +		return dnode; +	} + +	static grpc::Status get_path(frr::DataTree *dt, const std::string &path, +				     int type, LYD_FORMAT lyd_format, +				     bool with_defaults) +	{ +		struct lyd_node *dnode_config = NULL; +		struct lyd_node *dnode_state = NULL; +		struct lyd_node *dnode_final; + +		// Configuration data. +		if (type == frr::GetRequest_DataType_ALL +		    || type == frr::GetRequest_DataType_CONFIG) { +			dnode_config = get_dnode_config(path); +			if (!dnode_config) +				return grpc::Status( +					grpc::StatusCode::INVALID_ARGUMENT, +					"Data path not found"); +		} + +		// Operational data. +		if (type == frr::GetRequest_DataType_ALL +		    || type == frr::GetRequest_DataType_STATE) { +			dnode_state = get_dnode_state(path); +			if (!dnode_state) { +				if (dnode_config) +					yang_dnode_free(dnode_config); +				return grpc::Status( +					grpc::StatusCode::INVALID_ARGUMENT, +					"Failed to fetch operational data"); +			} +		} + +		switch (type) { +		case frr::GetRequest_DataType_ALL: +			// +			// Combine configuration and state data into a single +			// dnode. +			// +			if (lyd_merge(dnode_state, dnode_config, +				      LYD_OPT_EXPLICIT) +			    != 0) { +				yang_dnode_free(dnode_state); +				yang_dnode_free(dnode_config); +				return grpc::Status( +					grpc::StatusCode::INTERNAL, +					"Failed to merge configuration and state data"); +			} + +			dnode_final = dnode_state; +			break; +		case frr::GetRequest_DataType_CONFIG: +			dnode_final = dnode_config; +			break; +		case frr::GetRequest_DataType_STATE: +			dnode_final = dnode_state; +			break; +		} + +		// Validate data to create implicit default nodes if necessary. +		int validate_opts = 0; +		if (type == frr::GetRequest_DataType_CONFIG) +			validate_opts = LYD_OPT_CONFIG; +		else +			validate_opts = LYD_OPT_DATA | LYD_OPT_DATA_NO_YANGLIB; +		lyd_validate(&dnode_final, validate_opts, ly_native_ctx); + +		// Dump data using the requested format. +		int ret = data_tree_from_dnode(dt, dnode_final, lyd_format, +					       with_defaults); +		yang_dnode_free(dnode_final); +		if (ret != 0) +			return grpc::Status(grpc::StatusCode::INTERNAL, +					    "Failed to dump data"); + +		return grpc::Status::OK; +	} + +	struct candidate *create_candidate(void) +	{ +		uint32_t candidate_id = ++_nextCandidateId; + +		// Check for overflow. +		// TODO: implement an algorithm for unique reusable IDs. +		if (candidate_id == 0) +			return NULL; + +		struct candidate *candidate = &_candidates[candidate_id]; +		candidate->id = candidate_id; +		pthread_rwlock_rdlock(&running_config->lock); +		{ +			candidate->config = nb_config_dup(running_config); +		} +		pthread_rwlock_unlock(&running_config->lock); +		candidate->transaction = NULL; + +		return candidate; +	} + +	void delete_candidate(struct candidate *candidate) +	{ +		_candidates.erase(candidate->id); +		nb_config_free(candidate->config); +		if (candidate->transaction) +			nb_candidate_commit_abort(candidate->transaction); +	} + +	struct candidate *get_candidate(uint32_t candidate_id) +	{ +		struct candidate *candidate; + +		if (_candidates.count(candidate_id) == 0) +			return NULL; + +		return &_candidates[candidate_id]; +	} +}; + +static void *grpc_pthread_start(void *arg) +{ +	unsigned long *port = static_cast<unsigned long *>(arg); +	NorthboundImpl service; +	std::stringstream server_address; + +	server_address << "0.0.0.0:" << *port; + +	grpc::ServerBuilder builder; +	builder.AddListeningPort(server_address.str(), +				 grpc::InsecureServerCredentials()); +	builder.RegisterService(&service); + +	std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); + +	zlog_notice("gRPC server listening on %s", +		    server_address.str().c_str()); + +	server->Wait(); + +	return NULL; +} + +static int frr_grpc_init(unsigned long *port) +{ +	/* Create a pthread for gRPC since it runs its own event loop. */ +	if (pthread_create(&grpc_pthread, NULL, grpc_pthread_start, port)) { +		flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s", +			 __func__, safe_strerror(errno)); +		return -1; +	} +	pthread_detach(grpc_pthread); + +	return 0; +} + +static int frr_grpc_finish(void) +{ +	// TODO: cancel the gRPC pthreads gracefully. + +	return 0; +} + +static int frr_grpc_module_late_init(struct thread_master *tm) +{ +	static unsigned long port = GRPC_DEFAULT_PORT; +	const char *args = THIS_MODULE->load_args; + +	// Parse port number. +	if (args) { +		try { +			port = std::stoul(args); +			if (port < 1024) +				throw std::invalid_argument( +					"can't use privileged port"); +			if (port > UINT16_MAX) +				throw std::invalid_argument( +					"port number is too big"); +		} catch (std::exception &e) { +			flog_err(EC_LIB_GRPC_INIT, +				 "%s: failed to parse port number: %s", +				 __func__, e.what()); +			goto error; +		} +	} + +	if (frr_grpc_init(&port) < 0) +		goto error; + +	hook_register(frr_fini, frr_grpc_finish); + +	return 0; + +error: +	flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module"); +	return -1; +} + +static int frr_grpc_module_init(void) +{ +	hook_register(frr_late_init, frr_grpc_module_late_init); + +	return 0; +} + +FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION, +		 .description = "FRR gRPC northbound module", +		 .init = frr_grpc_module_init, )  | 
