summaryrefslogtreecommitdiff
path: root/lib/northbound_grpc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/northbound_grpc.cpp')
-rw-r--r--lib/northbound_grpc.cpp1317
1 files changed, 894 insertions, 423 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp
index fd4101d679..96a11d67bd 100644
--- a/lib/northbound_grpc.cpp
+++ b/lib/northbound_grpc.cpp
@@ -54,12 +54,70 @@ static const struct frr_pthread_attr attr = {
.stop = NULL,
};
-class NorthboundImpl final : public frr::Northbound::Service
+enum CallStatus { CREATE, PROCESS, FINISH };
+
+/* Thanks gooble */
+class RpcStateBase
+{
+ public:
+ virtual void doCallback() = 0;
+};
+
+class NorthboundImpl;
+
+template <typename Q, typename S> class RpcState : RpcStateBase
+{
+ public:
+ RpcState(NorthboundImpl *svc,
+ void (NorthboundImpl::*cb)(RpcState<Q, S> *))
+ : callback(cb), responder(&ctx), async_responder(&ctx),
+ service(svc){};
+
+ void doCallback() override
+ {
+ (service->*callback)(this);
+ }
+
+ grpc::ServerContext ctx;
+ Q request;
+ S response;
+ grpc::ServerAsyncResponseWriter<S> responder;
+ grpc::ServerAsyncWriter<S> async_responder;
+
+ NorthboundImpl *service;
+ void (NorthboundImpl::*callback)(RpcState<Q, S> *);
+
+ void *context;
+ CallStatus state = CREATE;
+};
+
+#define REQUEST_RPC(NAME) \
+ do { \
+ auto _rpcState = \
+ new RpcState<frr::NAME##Request, frr::NAME##Response>( \
+ this, &NorthboundImpl::Handle##NAME); \
+ _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \
+ &_rpcState->responder, _cq, _cq, \
+ _rpcState); \
+ } while (0)
+
+#define REQUEST_RPC_STREAMING(NAME) \
+ do { \
+ auto _rpcState = \
+ new RpcState<frr::NAME##Request, frr::NAME##Response>( \
+ this, &NorthboundImpl::Handle##NAME); \
+ _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \
+ &_rpcState->async_responder, _cq, _cq, \
+ _rpcState); \
+ } while (0)
+
+class NorthboundImpl
{
public:
NorthboundImpl(void)
{
_nextCandidateId = 0;
+ _service = new frr::Northbound::AsyncService();
}
~NorthboundImpl(void)
@@ -70,61 +128,136 @@ class NorthboundImpl final : public frr::Northbound::Service
delete_candidate(&it->second);
}
- grpc::Status
- GetCapabilities(grpc::ServerContext *context,
- frr::GetCapabilitiesRequest const *request,
- frr::GetCapabilitiesResponse *response) override
+ void Run(unsigned long port)
+ {
+ grpc::ServerBuilder builder;
+ std::stringstream server_address;
+
+ server_address << "0.0.0.0:" << port;
+
+ builder.AddListeningPort(server_address.str(),
+ grpc::InsecureServerCredentials());
+ builder.RegisterService(_service);
+
+ auto cq = builder.AddCompletionQueue();
+ _cq = cq.get();
+ auto _server = builder.BuildAndStart();
+
+ /* Schedule all RPC handlers */
+ REQUEST_RPC(GetCapabilities);
+ REQUEST_RPC(CreateCandidate);
+ REQUEST_RPC(DeleteCandidate);
+ REQUEST_RPC(UpdateCandidate);
+ REQUEST_RPC(EditCandidate);
+ REQUEST_RPC(LoadToCandidate);
+ REQUEST_RPC(Commit);
+ REQUEST_RPC(GetTransaction);
+ REQUEST_RPC(LockConfig);
+ REQUEST_RPC(UnlockConfig);
+ REQUEST_RPC(Execute);
+ REQUEST_RPC_STREAMING(Get);
+ REQUEST_RPC_STREAMING(ListTransactions);
+
+ zlog_notice("gRPC server listening on %s",
+ server_address.str().c_str());
+
+ /* Process inbound RPCs */
+ void *tag;
+ bool ok;
+ while (true) {
+ _cq->Next(&tag, &ok);
+ GPR_ASSERT(ok);
+ static_cast<RpcStateBase *>(tag)->doCallback();
+ tag = nullptr;
+ }
+ }
+
+ void HandleGetCapabilities(RpcState<frr::GetCapabilitiesRequest,
+ frr::GetCapabilitiesResponse> *tag)
{
if (nb_dbg_client_grpc)
zlog_debug("received RPC GetCapabilities()");
- // Response: string frr_version = 1;
- response->set_frr_version(FRR_VERSION);
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(GetCapabilities);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ // Response: string frr_version = 1;
+ tag->response.set_frr_version(FRR_VERSION);
- // Response: bool rollback_support = 2;
+ // Response: bool rollback_support = 2;
#ifdef HAVE_CONFIG_ROLLBACKS
- response->set_rollback_support(true);
+ tag->response.set_rollback_support(true);
#else
- response->set_rollback_support(false);
+ tag->response.set_rollback_support(false);
#endif
- // Response: repeated ModuleData supported_modules = 3;
- struct yang_module *module;
- RB_FOREACH (module, yang_modules, &yang_modules) {
- auto m = response->add_supported_modules();
+ // Response: repeated ModuleData supported_modules = 3;
+ struct yang_module *module;
+ RB_FOREACH (module, yang_modules, &yang_modules) {
+ auto m = tag->response.add_supported_modules();
- m->set_name(module->name);
- if (module->info->rev_size)
- m->set_revision(module->info->rev[0].date);
- m->set_organization(module->info->org);
- }
+ m->set_name(module->name);
+ if (module->info->rev_size)
+ m->set_revision(
+ module->info->rev[0].date);
+ m->set_organization(module->info->org);
+ }
- // Response: repeated Encoding supported_encodings = 4;
- response->add_supported_encodings(frr::JSON);
- response->add_supported_encodings(frr::XML);
+ // Response: repeated Encoding supported_encodings = 4;
+ tag->response.add_supported_encodings(frr::JSON);
+ tag->response.add_supported_encodings(frr::XML);
- return grpc::Status::OK;
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status Get(grpc::ServerContext *context,
- frr::GetRequest const *request,
- grpc::ServerWriter<frr::GetResponse> *writer) override
+ void HandleGet(RpcState<frr::GetRequest, frr::GetResponse> *tag)
{
- // Request: DataType type = 1;
- int type = request->type();
- // Request: Encoding encoding = 2;
- frr::Encoding encoding = request->encoding();
- // Request: bool with_defaults = 3;
- bool with_defaults = request->with_defaults();
+ switch (tag->state) {
+ case CREATE: {
+ auto mypaths = new std::list<std::string>();
+ tag->context = mypaths;
+ auto paths = tag->request.path();
+ for (const std::string &path : paths) {
+ mypaths->push_back(std::string(path));
+ }
+ REQUEST_RPC_STREAMING(Get);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+ // Request: DataType type = 1;
+ int type = tag->request.type();
+ // Request: Encoding encoding = 2;
+ frr::Encoding encoding = tag->request.encoding();
+ // Request: bool with_defaults = 3;
+ bool with_defaults = tag->request.with_defaults();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC Get(type: %u, encoding: %u, with_defaults: %u)",
+ type, encoding, with_defaults);
+
+ auto mypaths = static_cast<std::list<std::string> *>(
+ tag->context);
+
+ if (mypaths->empty()) {
+ tag->async_responder.Finish(grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC Get(type: %u, encoding: %u, with_defaults: %u)",
- type, encoding, with_defaults);
- // Request: repeated string path = 4;
- auto paths = request->path();
- for (const std::string &path : paths) {
frr::GetResponse response;
grpc::Status status;
@@ -133,391 +266,715 @@ class NorthboundImpl final : public frr::Northbound::Service
// Response: DataTree data = 2;
auto *data = response.mutable_data();
- data->set_encoding(request->encoding());
- status = get_path(data, path, type,
+ data->set_encoding(tag->request.encoding());
+ status = get_path(data, mypaths->back().c_str(), type,
encoding2lyd_format(encoding),
with_defaults);
// Something went wrong...
- if (!status.ok())
- return status;
+ if (!status.ok()) {
+ tag->async_responder.WriteAndFinish(
+ response, grpc::WriteOptions(), status,
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- writer->Write(response);
- }
+ mypaths->pop_back();
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC Get() end");
+ tag->async_responder.Write(response, tag);
- return grpc::Status::OK;
+ break;
+ }
+ case FINISH:
+ if (nb_dbg_client_grpc)
+ zlog_debug("received RPC Get() end");
+
+ delete static_cast<std::list<std::string> *>(
+ tag->context);
+ delete tag;
+ }
}
- grpc::Status
- CreateCandidate(grpc::ServerContext *context,
- frr::CreateCandidateRequest const *request,
- frr::CreateCandidateResponse *response) override
+ void HandleCreateCandidate(RpcState<frr::CreateCandidateRequest,
+ frr::CreateCandidateResponse> *tag)
{
if (nb_dbg_client_grpc)
zlog_debug("received RPC CreateCandidate()");
- struct candidate *candidate = create_candidate();
- if (!candidate)
- return grpc::Status(
- grpc::StatusCode::RESOURCE_EXHAUSTED,
- "Can't create candidate configuration");
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(CreateCandidate);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+ struct candidate *candidate = create_candidate();
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ RESOURCE_EXHAUSTED,
+ "Can't create candidate configuration"),
+ tag);
+ } else {
+ tag->response.set_candidate_id(candidate->id);
+ tag->responder.Finish(tag->response,
+ grpc::Status::OK, tag);
+ }
- // Response: uint32 candidate_id = 1;
- response->set_candidate_id(candidate->id);
+ tag->state = FINISH;
- return grpc::Status::OK;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status
- DeleteCandidate(grpc::ServerContext *context,
- frr::DeleteCandidateRequest const *request,
- frr::DeleteCandidateResponse *response) override
+ void HandleDeleteCandidate(RpcState<frr::DeleteCandidateRequest,
+ frr::DeleteCandidateResponse> *tag)
{
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = request->candidate_id();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC DeleteCandidate(candidate_id: %u)",
- candidate_id);
-
- struct candidate *candidate = get_candidate(candidate_id);
- if (!candidate)
- return grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found");
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(DeleteCandidate);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC DeleteCandidate(candidate_id: %u)",
+ candidate_id);
+
+ struct candidate *candidate =
+ get_candidate(candidate_id);
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ return;
+ } else {
+ delete_candidate(candidate);
+ tag->responder.Finish(tag->response,
+ grpc::Status::OK, tag);
+ tag->state = FINISH;
+ return;
+ }
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
+ }
- delete_candidate(candidate);
+ void HandleUpdateCandidate(RpcState<frr::UpdateCandidateRequest,
+ frr::UpdateCandidateResponse> *tag)
+ {
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(UpdateCandidate);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC UpdateCandidate(candidate_id: %u)",
+ candidate_id);
+
+ struct candidate *candidate =
+ get_candidate(candidate_id);
+
+ if (!candidate)
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ else if (candidate->transaction)
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ FAILED_PRECONDITION,
+ "candidate is in the middle of a transaction"),
+ tag);
+ else if (nb_candidate_update(candidate->config)
+ != NB_OK)
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::INTERNAL,
+ "failed to update candidate configuration"),
+ tag);
+
+ else
+ tag->responder.Finish(tag->response,
+ grpc::Status::OK, tag);
+
+ tag->state = FINISH;
- return grpc::Status::OK;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status
- UpdateCandidate(grpc::ServerContext *context,
- frr::UpdateCandidateRequest const *request,
- frr::UpdateCandidateResponse *response) override
+ void HandleEditCandidate(RpcState<frr::EditCandidateRequest,
+ frr::EditCandidateResponse> *tag)
{
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = request->candidate_id();
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(EditCandidate);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC EditCandidate(candidate_id: %u)",
+ candidate_id);
+
+ struct candidate *candidate =
+ get_candidate(candidate_id);
+
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ break;
+ }
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC UpdateCandidate(candidate_id: %u)",
- candidate_id);
+ struct nb_config *candidate_tmp =
+ nb_config_dup(candidate->config);
+
+ auto pvs = tag->request.update();
+ for (const frr::PathValue &pv : pvs) {
+ if (yang_dnode_edit(candidate_tmp->dnode,
+ pv.path(), pv.value())
+ != 0) {
+ nb_config_free(candidate_tmp);
+
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ INVALID_ARGUMENT,
+ "Failed to update \""
+ + pv.path()
+ + "\""),
+ tag);
+
+ tag->state = FINISH;
+ return;
+ }
+ }
+
+ pvs = tag->request.delete_();
+ for (const frr::PathValue &pv : pvs) {
+ if (yang_dnode_delete(candidate_tmp->dnode,
+ pv.path())
+ != 0) {
+ nb_config_free(candidate_tmp);
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ INVALID_ARGUMENT,
+ "Failed to remove \""
+ + pv.path()
+ + "\""),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+ }
- struct candidate *candidate = get_candidate(candidate_id);
- if (!candidate)
- return grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found");
+ // No errors, accept all changes.
+ nb_config_replace(candidate->config, candidate_tmp,
+ false);
- if (candidate->transaction)
- return grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "candidate is in the middle of a transaction");
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
- if (nb_candidate_update(candidate->config) != NB_OK)
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "failed to update candidate configuration");
+ tag->state = FINISH;
- return grpc::Status::OK;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status
- EditCandidate(grpc::ServerContext *context,
- frr::EditCandidateRequest const *request,
- frr::EditCandidateResponse *response) override
+ void HandleLoadToCandidate(RpcState<frr::LoadToCandidateRequest,
+ frr::LoadToCandidateResponse> *tag)
{
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = request->candidate_id();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC EditCandidate(candidate_id: %u)",
- candidate_id);
-
- struct candidate *candidate = get_candidate(candidate_id);
- if (!candidate)
- return grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found");
-
- // Create a copy of the candidate. For consistency, we need to
- // ensure that either all changes are accepted or none are (in
- // the event of an error).
- struct nb_config *candidate_tmp =
- nb_config_dup(candidate->config);
-
- auto pvs = request->update();
- for (const frr::PathValue &pv : pvs) {
- if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
- pv.value())
- != 0) {
- nb_config_free(candidate_tmp);
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Failed to update \"" + pv.path()
- + "\"");
- }
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(LoadToCandidate);
+ tag->state = PROCESS;
}
+ case PROCESS: {
+
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC LoadToCandidate(candidate_id: %u)",
+ candidate_id);
+
+ // Request: LoadType type = 2;
+ int load_type = tag->request.type();
+ // Request: DataTree config = 3;
+ auto config = tag->request.config();
+
+
+ struct candidate *candidate =
+ get_candidate(candidate_id);
+
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- pvs = request->delete_();
- for (const frr::PathValue &pv : pvs) {
- if (yang_dnode_delete(candidate_tmp->dnode, pv.path())
- != 0) {
- nb_config_free(candidate_tmp);
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Failed to remove \"" + pv.path()
- + "\"");
+ struct lyd_node *dnode =
+ dnode_from_data_tree(&config, true);
+ if (!dnode) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::INTERNAL,
+ "Failed to parse the configuration"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- }
- // No errors, accept all changes.
- nb_config_replace(candidate->config, candidate_tmp, false);
+ struct nb_config *loaded_config = nb_config_new(dnode);
+
+ if (load_type == frr::LoadToCandidateRequest::REPLACE)
+ nb_config_replace(candidate->config,
+ loaded_config, false);
+ else if (nb_config_merge(candidate->config,
+ loaded_config, false)
+ != NB_OK) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::INTERNAL,
+ "Failed to merge the loaded configuration"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- return grpc::Status::OK;
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status
- LoadToCandidate(grpc::ServerContext *context,
- frr::LoadToCandidateRequest const *request,
- frr::LoadToCandidateResponse *response) override
+ void
+ HandleCommit(RpcState<frr::CommitRequest, frr::CommitResponse> *tag)
{
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = request->candidate_id();
- // Request: LoadType type = 2;
- int load_type = request->type();
- // Request: DataTree config = 3;
- auto config = request->config();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC LoadToCandidate(candidate_id: %u)",
- candidate_id);
-
- struct candidate *candidate = get_candidate(candidate_id);
- if (!candidate)
- return grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found");
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(Commit);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC Commit(candidate_id: %u)",
+ candidate_id);
+
+ // Request: Phase phase = 2;
+ int phase = tag->request.phase();
+ // Request: string comment = 3;
+ const std::string comment = tag->request.comment();
+
+ // Find candidate configuration.
+ struct candidate *candidate =
+ get_candidate(candidate_id);
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- struct lyd_node *dnode = dnode_from_data_tree(&config, true);
- if (!dnode)
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Failed to parse the configuration");
+ int ret = NB_OK;
+ uint32_t transaction_id = 0;
+
+ // Check for misuse of the two-phase commit protocol.
+ switch (phase) {
+ case frr::CommitRequest::PREPARE:
+ case frr::CommitRequest::ALL:
+ if (candidate->transaction) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ FAILED_PRECONDITION,
+ "candidate is in the middle of a transaction"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+ break;
+ case frr::CommitRequest::ABORT:
+ case frr::CommitRequest::APPLY:
+ if (!candidate->transaction) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ FAILED_PRECONDITION,
+ "no transaction in progress"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+ break;
+ default:
+ break;
+ }
- struct nb_config *loaded_config = nb_config_new(dnode);
- if (load_type == frr::LoadToCandidateRequest::REPLACE)
- nb_config_replace(candidate->config, loaded_config,
- false);
- else if (nb_config_merge(candidate->config, loaded_config,
- false)
- != NB_OK)
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Failed to merge the loaded configuration");
+ // Execute the user request.
+ switch (phase) {
+ case frr::CommitRequest::VALIDATE:
+ ret = nb_candidate_validate(candidate->config);
+ break;
+ case frr::CommitRequest::PREPARE:
+ ret = nb_candidate_commit_prepare(
+ candidate->config, NB_CLIENT_GRPC, NULL,
+ comment.c_str(),
+ &candidate->transaction);
+ break;
+ case frr::CommitRequest::ABORT:
+ nb_candidate_commit_abort(
+ candidate->transaction);
+ break;
+ case frr::CommitRequest::APPLY:
+ nb_candidate_commit_apply(
+ candidate->transaction, true,
+ &transaction_id);
+ break;
+ case frr::CommitRequest::ALL:
+ ret = nb_candidate_commit(
+ candidate->config, NB_CLIENT_GRPC, NULL,
+ true, comment.c_str(), &transaction_id);
+ break;
+ }
- return grpc::Status::OK;
- }
+ // Map northbound error codes to gRPC error codes.
+ switch (ret) {
+ case NB_ERR_NO_CHANGES:
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::ABORTED,
+ "No configuration changes detected"),
+ tag);
+ tag->state = FINISH;
+ return;
+ case NB_ERR_LOCKED:
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::UNAVAILABLE,
+ "There's already a transaction in progress"),
+ tag);
+ tag->state = FINISH;
+ return;
+ case NB_ERR_VALIDATION:
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::
+ INVALID_ARGUMENT,
+ "Validation error"),
+ tag);
+ tag->state = FINISH;
+ return;
+ case NB_ERR_RESOURCE:
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ RESOURCE_EXHAUSTED,
+ "Failed do allocate resources"),
+ tag);
+ tag->state = FINISH;
+ return;
+ case NB_ERR:
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INTERNAL,
+ "Internal error"),
+ tag);
+ tag->state = FINISH;
+ return;
+ default:
+ break;
+ }
- grpc::Status Commit(grpc::ServerContext *context,
- frr::CommitRequest const *request,
- frr::CommitResponse *response) override
- {
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = request->candidate_id();
- // Request: Phase phase = 2;
- int phase = request->phase();
- // Request: string comment = 3;
- const std::string comment = request->comment();
+ // Response: uint32 transaction_id = 1;
+ if (transaction_id)
+ tag->response.set_transaction_id(
+ transaction_id);
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC Commit(candidate_id: %u)",
- candidate_id);
-
- // Find candidate configuration.
- struct candidate *candidate = get_candidate(candidate_id);
- if (!candidate)
- return grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found");
-
- int ret = NB_OK;
- uint32_t transaction_id = 0;
-
- // Check for misuse of the two-phase commit protocol.
- switch (phase) {
- case frr::CommitRequest::PREPARE:
- case frr::CommitRequest::ALL:
- if (candidate->transaction)
- return grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "pending transaction in progress");
- break;
- case frr::CommitRequest::ABORT:
- case frr::CommitRequest::APPLY:
- if (!candidate->transaction)
- return grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "no transaction in progress");
- break;
- default:
- break;
- }
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
- // Execute the user request.
- switch (phase) {
- case frr::CommitRequest::VALIDATE:
- ret = nb_candidate_validate(candidate->config);
- break;
- case frr::CommitRequest::PREPARE:
- ret = nb_candidate_commit_prepare(
- candidate->config, NB_CLIENT_GRPC, NULL,
- comment.c_str(), &candidate->transaction);
- break;
- case frr::CommitRequest::ABORT:
- nb_candidate_commit_abort(candidate->transaction);
- break;
- case frr::CommitRequest::APPLY:
- nb_candidate_commit_apply(candidate->transaction, true,
- &transaction_id);
- break;
- case frr::CommitRequest::ALL:
- ret = nb_candidate_commit(
- candidate->config, NB_CLIENT_GRPC, NULL, true,
- comment.c_str(), &transaction_id);
break;
}
-
- // Map northbound error codes to gRPC error codes.
- switch (ret) {
- case NB_ERR_NO_CHANGES:
- return grpc::Status(
- grpc::StatusCode::ABORTED,
- "No configuration changes detected");
- case NB_ERR_LOCKED:
- return grpc::Status(
- grpc::StatusCode::UNAVAILABLE,
- "There's already a transaction in progress");
- case NB_ERR_VALIDATION:
- return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Validation error");
- case NB_ERR_RESOURCE:
- return grpc::Status(
- grpc::StatusCode::RESOURCE_EXHAUSTED,
- "Failed do allocate resources");
- case NB_ERR:
- return grpc::Status(grpc::StatusCode::INTERNAL,
- "Internal error");
- default:
- break;
+ case FINISH:
+ delete tag;
}
-
- // Response: uint32 transaction_id = 1;
- if (transaction_id)
- response->set_transaction_id(transaction_id);
-
- return grpc::Status::OK;
}
- grpc::Status
- ListTransactions(grpc::ServerContext *context,
- frr::ListTransactionsRequest const *request,
- grpc::ServerWriter<frr::ListTransactionsResponse>
- *writer) override
+ void
+ HandleListTransactions(RpcState<frr::ListTransactionsRequest,
+ frr::ListTransactionsResponse> *tag)
{
if (nb_dbg_client_grpc)
zlog_debug("received RPC ListTransactions()");
- nb_db_transactions_iterate(list_transactions_cb, writer);
-
- return grpc::Status::OK;
- }
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC_STREAMING(ListTransactions);
+ tag->context = new std::list<std::tuple<
+ int, std::string, std::string, std::string>>();
+ nb_db_transactions_iterate(list_transactions_cb,
+ tag->context);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+ auto list = static_cast<std::list<std::tuple<
+ int, std::string, std::string, std::string>> *>(
+ tag->context);
+ if (list->empty()) {
+ tag->async_responder.Finish(grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+ auto item = list->back();
- grpc::Status
- GetTransaction(grpc::ServerContext *context,
- frr::GetTransactionRequest const *request,
- frr::GetTransactionResponse *response) override
- {
- struct nb_config *nb_config;
- // Request: uint32 transaction_id = 1;
- uint32_t transaction_id = request->transaction_id();
- // Request: Encoding encoding = 2;
- frr::Encoding encoding = request->encoding();
- // Request: bool with_defaults = 3;
- bool with_defaults = request->with_defaults();
+ frr::ListTransactionsResponse response;
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC GetTransaction(transaction_id: %u, encoding: %u)",
- transaction_id, encoding);
+ // Response: uint32 id = 1;
+ response.set_id(std::get<0>(item));
- // Load configuration from the transactions database.
- nb_config = nb_db_transaction_load(transaction_id);
- if (!nb_config)
- return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Transaction not found");
+ // Response: string client = 2;
+ response.set_client(std::get<1>(item).c_str());
- // Response: DataTree config = 1;
- auto config = response->mutable_config();
- config->set_encoding(encoding);
+ // Response: string date = 3;
+ response.set_date(std::get<2>(item).c_str());
- // Dump data using the requested format.
- if (data_tree_from_dnode(config, nb_config->dnode,
- encoding2lyd_format(encoding),
- with_defaults)
- != 0) {
- nb_config_free(nb_config);
- return grpc::Status(grpc::StatusCode::INTERNAL,
- "Failed to dump data");
- }
+ // Response: string comment = 4;
+ response.set_comment(std::get<3>(item).c_str());
- nb_config_free(nb_config);
+ list->pop_back();
- return grpc::Status::OK;
+ tag->async_responder.Write(response, tag);
+ break;
+ }
+ case FINISH:
+ delete static_cast<std::list<std::tuple<
+ int, std::string, std::string, std::string>> *>(
+ tag->context);
+ delete tag;
+ }
}
- grpc::Status LockConfig(grpc::ServerContext *context,
- frr::LockConfigRequest const *request,
- frr::LockConfigResponse *response) override
+ void HandleGetTransaction(RpcState<frr::GetTransactionRequest,
+ frr::GetTransactionResponse> *tag)
{
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC LockConfig()");
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(GetTransaction);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+ // Request: uint32 transaction_id = 1;
+ uint32_t transaction_id = tag->request.transaction_id();
+ // Request: Encoding encoding = 2;
+ frr::Encoding encoding = tag->request.encoding();
+ // Request: bool with_defaults = 3;
+ bool with_defaults = tag->request.with_defaults();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug(
+ "received RPC GetTransaction(transaction_id: %u, encoding: %u)",
+ transaction_id, encoding);
+
+ struct nb_config *nb_config;
+
+ // Load configuration from the transactions database.
+ nb_config = nb_db_transaction_load(transaction_id);
+ if (!nb_config) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::
+ INVALID_ARGUMENT,
+ "Transaction not found"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- if (nb_running_lock(NB_CLIENT_GRPC, NULL))
- return grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "running configuration is locked already");
+ // Response: DataTree config = 1;
+ auto config = tag->response.mutable_config();
+ config->set_encoding(encoding);
- return grpc::Status::OK;
+ // Dump data using the requested format.
+ if (data_tree_from_dnode(config, nb_config->dnode,
+ encoding2lyd_format(encoding),
+ with_defaults)
+ != 0) {
+ nb_config_free(nb_config);
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to dump data"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+
+ nb_config_free(nb_config);
+
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status UnlockConfig(grpc::ServerContext *context,
- frr::UnlockConfigRequest const *request,
- frr::UnlockConfigResponse *response) override
+ void HandleLockConfig(
+ RpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
{
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC UnlockConfig()");
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(LockConfig);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+ auto rpcState = new RpcState<frr::LockConfigRequest,
+ frr::LockConfigResponse>(
+ this, &NorthboundImpl::HandleLockConfig);
+ _service->RequestLockConfig(
+ &rpcState->ctx, &rpcState->request,
+ &rpcState->responder, _cq, _cq, rpcState);
+
+ if (nb_dbg_client_grpc)
+ zlog_debug("received RPC LockConfig()");
+
+ if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ FAILED_PRECONDITION,
+ "running configuration is locked already"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
+ }
- if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
- return grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "failed to unlock the running configuration");
+ void HandleUnlockConfig(RpcState<frr::UnlockConfigRequest,
+ frr::UnlockConfigResponse> *tag)
+ {
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(UnlockConfig);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ if (nb_dbg_client_grpc)
+ zlog_debug("received RPC UnlockConfig()");
+
+ if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::
+ FAILED_PRECONDITION,
+ "failed to unlock the running configuration"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- return grpc::Status::OK;
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
+ }
}
- grpc::Status Execute(grpc::ServerContext *context,
- frr::ExecuteRequest const *request,
- frr::ExecuteResponse *response) override
+ void
+ HandleExecute(RpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
{
struct nb_node *nb_node;
struct list *input_list;
@@ -526,61 +983,100 @@ class NorthboundImpl final : public frr::Northbound::Service
struct yang_data *data;
const char *xpath;
- // Request: string path = 1;
- xpath = request->path().c_str();
+ switch (tag->state) {
+ case CREATE: {
+ REQUEST_RPC(Execute);
+ tag->state = PROCESS;
+ }
+ case PROCESS: {
+
+ // Request: string path = 1;
+ xpath = tag->request.path().c_str();
+
+ if (nb_dbg_client_grpc)
+ zlog_debug("received RPC Execute(path: \"%s\")",
+ xpath);
+
+ if (tag->request.path().empty()) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::
+ INVALID_ARGUMENT,
+ "Data path is empty"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC Execute(path: \"%s\")", xpath);
+ nb_node = nb_node_find(xpath);
+ if (!nb_node) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::
+ INVALID_ARGUMENT,
+ "Unknown data path"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- if (request->path().empty())
- return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Data path is empty");
+ input_list = yang_data_list_new();
+ output_list = yang_data_list_new();
- nb_node = nb_node_find(xpath);
- if (!nb_node)
- return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Unknown data path");
+ // Read input parameters.
+ auto input = tag->request.input();
+ for (const frr::PathValue &pv : input) {
+ // Request: repeated PathValue input = 2;
+ data = yang_data_new(pv.path().c_str(),
+ pv.value().c_str());
+ listnode_add(input_list, data);
+ }
- input_list = yang_data_list_new();
- output_list = yang_data_list_new();
+ // Execute callback registered for this XPath.
+ if (nb_callback_rpc(nb_node, xpath, input_list,
+ output_list)
+ != NB_OK) {
+ flog_warn(EC_LIB_NB_CB_RPC,
+ "%s: rpc callback failed: %s",
+ __func__, xpath);
+ list_delete(&input_list);
+ list_delete(&output_list);
+
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INTERNAL,
+ "RPC failed"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- // Read input parameters.
- auto input = request->input();
- for (const frr::PathValue &pv : input) {
- // Request: repeated PathValue input = 2;
- data = yang_data_new(pv.path().c_str(),
- pv.value().c_str());
- listnode_add(input_list, data);
- }
+ // Process output parameters.
+ for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
+ // Response: repeated PathValue output = 1;
+ frr::PathValue *pv = tag->response.add_output();
+ pv->set_path(data->xpath);
+ pv->set_value(data->value);
+ }
- // Execute callback registered for this XPath.
- if (nb_callback_rpc(nb_node, xpath, input_list, output_list)
- != NB_OK) {
- flog_warn(EC_LIB_NB_CB_RPC,
- "%s: rpc callback failed: %s", __func__,
- xpath);
+ // Release memory.
list_delete(&input_list);
list_delete(&output_list);
- return grpc::Status(grpc::StatusCode::INTERNAL,
- "RPC failed");
- }
- // Process output parameters.
- for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
- // Response: repeated PathValue output = 1;
- frr::PathValue *pv = response->add_output();
- pv->set_path(data->xpath);
- pv->set_value(data->value);
+ tag->responder.Finish(tag->response, grpc::Status::OK,
+ tag);
+ tag->state = FINISH;
+ break;
+ }
+ case FINISH:
+ delete tag;
}
-
- // Release memory.
- list_delete(&input_list);
- list_delete(&output_list);
-
- return grpc::Status::OK;
}
private:
+ frr::Northbound::AsyncService *_service;
+ grpc::ServerCompletionQueue *_cq;
+
struct candidate {
uint32_t id;
struct nb_config *config;
@@ -649,24 +1145,12 @@ class NorthboundImpl final : public frr::Northbound::Service
const char *client_name,
const char *date, const char *comment)
{
- grpc::ServerWriter<frr::ListTransactionsResponse> *writer =
- static_cast<grpc::ServerWriter<
- frr::ListTransactionsResponse> *>(arg);
- frr::ListTransactionsResponse response;
-
- // Response: uint32 id = 1;
- response.set_id(transaction_id);
-
- // Response: string client = 2;
- response.set_client(client_name);
-
- // Response: string date = 3;
- response.set_date(date);
- // Response: string comment = 4;
- response.set_comment(comment);
-
- writer->Write(response);
+ auto list = static_cast<std::list<std::tuple<
+ int, std::string, std::string, std::string>> *>(arg);
+ list->push_back(std::make_tuple(
+ transaction_id, std::string(client_name),
+ std::string(date), std::string(comment)));
}
static int data_tree_from_dnode(frr::DataTree *dt,
@@ -855,24 +1339,11 @@ static void *grpc_pthread_start(void *arg)
{
struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
unsigned long *port = static_cast<unsigned long *>(fpt->data);
- NorthboundImpl service;
- std::stringstream server_address;
frr_pthread_set_name(fpt);
- server_address << "0.0.0.0:" << *port;
-
- grpc::ServerBuilder builder;
- builder.AddListeningPort(server_address.str(),
- grpc::InsecureServerCredentials());
- builder.RegisterService(&service);
-
- std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
-
- zlog_notice("gRPC server listening on %s",
- server_address.str().c_str());
-
- server->Wait();
+ NorthboundImpl nb;
+ nb.Run(*port);
return NULL;
}