From ecf9fb30b75370c1354d07ae215c4950e5131dd8 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 6 May 2020 19:36:19 -0400 Subject: [PATCH] lib: convert GRPC plugin to async Synchronous GRPC services are called from arbitrary threads. This makes access to anything outside the GRPC module unsafe. We need to convert the plugin to use the async model that allows us to control our own threads. Signed-off-by: Quentin Young --- lib/northbound_grpc.cpp | 1317 ++++++++++++++++++++++++++------------- 1 file changed, 894 insertions(+), 423 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index fd4101d679..96a11d67bd 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -54,12 +54,70 @@ static const struct frr_pthread_attr attr = { .stop = NULL, }; -class NorthboundImpl final : public frr::Northbound::Service +enum CallStatus { CREATE, PROCESS, FINISH }; + +/* Thanks gooble */ +class RpcStateBase +{ + public: + virtual void doCallback() = 0; +}; + +class NorthboundImpl; + +template class RpcState : RpcStateBase +{ + public: + RpcState(NorthboundImpl *svc, + void (NorthboundImpl::*cb)(RpcState *)) + : callback(cb), responder(&ctx), async_responder(&ctx), + service(svc){}; + + void doCallback() override + { + (service->*callback)(this); + } + + grpc::ServerContext ctx; + Q request; + S response; + grpc::ServerAsyncResponseWriter responder; + grpc::ServerAsyncWriter async_responder; + + NorthboundImpl *service; + void (NorthboundImpl::*callback)(RpcState *); + + void *context; + CallStatus state = CREATE; +}; + +#define REQUEST_RPC(NAME) \ + do { \ + auto _rpcState = \ + new RpcState( \ + this, &NorthboundImpl::Handle##NAME); \ + _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ + &_rpcState->responder, _cq, _cq, \ + _rpcState); \ + } while (0) + +#define REQUEST_RPC_STREAMING(NAME) \ + do { \ + auto _rpcState = \ + new RpcState( \ + this, &NorthboundImpl::Handle##NAME); \ + _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ + &_rpcState->async_responder, _cq, _cq, \ + _rpcState); \ + } while (0) + +class NorthboundImpl { public: NorthboundImpl(void) { _nextCandidateId = 0; + _service = new frr::Northbound::AsyncService(); } ~NorthboundImpl(void) @@ -70,61 +128,136 @@ class NorthboundImpl final : public frr::Northbound::Service delete_candidate(&it->second); } - grpc::Status - GetCapabilities(grpc::ServerContext *context, - frr::GetCapabilitiesRequest const *request, - frr::GetCapabilitiesResponse *response) override + void Run(unsigned long port) + { + grpc::ServerBuilder builder; + std::stringstream server_address; + + server_address << "0.0.0.0:" << port; + + builder.AddListeningPort(server_address.str(), + grpc::InsecureServerCredentials()); + builder.RegisterService(_service); + + auto cq = builder.AddCompletionQueue(); + _cq = cq.get(); + auto _server = builder.BuildAndStart(); + + /* Schedule all RPC handlers */ + REQUEST_RPC(GetCapabilities); + REQUEST_RPC(CreateCandidate); + REQUEST_RPC(DeleteCandidate); + REQUEST_RPC(UpdateCandidate); + REQUEST_RPC(EditCandidate); + REQUEST_RPC(LoadToCandidate); + REQUEST_RPC(Commit); + REQUEST_RPC(GetTransaction); + REQUEST_RPC(LockConfig); + REQUEST_RPC(UnlockConfig); + REQUEST_RPC(Execute); + REQUEST_RPC_STREAMING(Get); + REQUEST_RPC_STREAMING(ListTransactions); + + zlog_notice("gRPC server listening on %s", + server_address.str().c_str()); + + /* Process inbound RPCs */ + void *tag; + bool ok; + while (true) { + _cq->Next(&tag, &ok); + GPR_ASSERT(ok); + static_cast(tag)->doCallback(); + tag = nullptr; + } + } + + void HandleGetCapabilities(RpcState *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC GetCapabilities()"); - // Response: string frr_version = 1; - response->set_frr_version(FRR_VERSION); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(GetCapabilities); + tag->state = PROCESS; + } + case PROCESS: { + + // Response: string frr_version = 1; + tag->response.set_frr_version(FRR_VERSION); - // Response: bool rollback_support = 2; + // Response: bool rollback_support = 2; #ifdef HAVE_CONFIG_ROLLBACKS - response->set_rollback_support(true); + tag->response.set_rollback_support(true); #else - response->set_rollback_support(false); + tag->response.set_rollback_support(false); #endif - // Response: repeated ModuleData supported_modules = 3; - struct yang_module *module; - RB_FOREACH (module, yang_modules, &yang_modules) { - auto m = response->add_supported_modules(); + // Response: repeated ModuleData supported_modules = 3; + struct yang_module *module; + RB_FOREACH (module, yang_modules, &yang_modules) { + auto m = tag->response.add_supported_modules(); - m->set_name(module->name); - if (module->info->rev_size) - m->set_revision(module->info->rev[0].date); - m->set_organization(module->info->org); - } + m->set_name(module->name); + if (module->info->rev_size) + m->set_revision( + module->info->rev[0].date); + m->set_organization(module->info->org); + } - // Response: repeated Encoding supported_encodings = 4; - response->add_supported_encodings(frr::JSON); - response->add_supported_encodings(frr::XML); + // Response: repeated Encoding supported_encodings = 4; + tag->response.add_supported_encodings(frr::JSON); + tag->response.add_supported_encodings(frr::XML); - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status Get(grpc::ServerContext *context, - frr::GetRequest const *request, - grpc::ServerWriter *writer) override + void HandleGet(RpcState *tag) { - // Request: DataType type = 1; - int type = request->type(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = request->encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = request->with_defaults(); + switch (tag->state) { + case CREATE: { + auto mypaths = new std::list(); + tag->context = mypaths; + auto paths = tag->request.path(); + for (const std::string &path : paths) { + mypaths->push_back(std::string(path)); + } + REQUEST_RPC_STREAMING(Get); + tag->state = PROCESS; + } + case PROCESS: { + // Request: DataType type = 1; + int type = tag->request.type(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", + type, encoding, with_defaults); + + auto mypaths = static_cast *>( + tag->context); + + if (mypaths->empty()) { + tag->async_responder.Finish(grpc::Status::OK, + tag); + tag->state = FINISH; + return; + } - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", - type, encoding, with_defaults); - // Request: repeated string path = 4; - auto paths = request->path(); - for (const std::string &path : paths) { frr::GetResponse response; grpc::Status status; @@ -133,391 +266,715 @@ class NorthboundImpl final : public frr::Northbound::Service // Response: DataTree data = 2; auto *data = response.mutable_data(); - data->set_encoding(request->encoding()); - status = get_path(data, path, type, + data->set_encoding(tag->request.encoding()); + status = get_path(data, mypaths->back().c_str(), type, encoding2lyd_format(encoding), with_defaults); // Something went wrong... - if (!status.ok()) - return status; + if (!status.ok()) { + tag->async_responder.WriteAndFinish( + response, grpc::WriteOptions(), status, + tag); + tag->state = FINISH; + return; + } - writer->Write(response); - } + mypaths->pop_back(); - if (nb_dbg_client_grpc) - zlog_debug("received RPC Get() end"); + tag->async_responder.Write(response, tag); - return grpc::Status::OK; + break; + } + case FINISH: + if (nb_dbg_client_grpc) + zlog_debug("received RPC Get() end"); + + delete static_cast *>( + tag->context); + delete tag; + } } - grpc::Status - CreateCandidate(grpc::ServerContext *context, - frr::CreateCandidateRequest const *request, - frr::CreateCandidateResponse *response) override + void HandleCreateCandidate(RpcState *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC CreateCandidate()"); - struct candidate *candidate = create_candidate(); - if (!candidate) - return grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - "Can't create candidate configuration"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(CreateCandidate); + tag->state = PROCESS; + } + case PROCESS: { + struct candidate *candidate = create_candidate(); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + RESOURCE_EXHAUSTED, + "Can't create candidate configuration"), + tag); + } else { + tag->response.set_candidate_id(candidate->id); + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + } - // Response: uint32 candidate_id = 1; - response->set_candidate_id(candidate->id); + tag->state = FINISH; - return grpc::Status::OK; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - DeleteCandidate(grpc::ServerContext *context, - frr::DeleteCandidateRequest const *request, - frr::DeleteCandidateResponse *response) override + void HandleDeleteCandidate(RpcState *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC DeleteCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(DeleteCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC DeleteCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } else { + delete_candidate(candidate); + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + tag->state = FINISH; + return; + } + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } + } - delete_candidate(candidate); + void HandleUpdateCandidate(RpcState *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(UpdateCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC UpdateCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + else if (candidate->transaction) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + else if (nb_candidate_update(candidate->config) + != NB_OK) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"), + tag); + + else + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + + tag->state = FINISH; - return grpc::Status::OK; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - UpdateCandidate(grpc::ServerContext *context, - frr::UpdateCandidateRequest const *request, - frr::UpdateCandidateResponse *response) override + void HandleEditCandidate(RpcState *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(EditCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC EditCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + break; + } - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC UpdateCandidate(candidate_id: %u)", - candidate_id); + struct nb_config *candidate_tmp = + nb_config_dup(candidate->config); + + auto pvs = tag->request.update(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_edit(candidate_tmp->dnode, + pv.path(), pv.value()) + != 0) { + nb_config_free(candidate_tmp); + + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + INVALID_ARGUMENT, + "Failed to update \"" + + pv.path() + + "\""), + tag); + + tag->state = FINISH; + return; + } + } + + pvs = tag->request.delete_(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_delete(candidate_tmp->dnode, + pv.path()) + != 0) { + nb_config_free(candidate_tmp); + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + INVALID_ARGUMENT, + "Failed to remove \"" + + pv.path() + + "\""), + tag); + tag->state = FINISH; + return; + } + } - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + // No errors, accept all changes. + nb_config_replace(candidate->config, candidate_tmp, + false); - if (candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"); + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); - if (nb_candidate_update(candidate->config) != NB_OK) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"); + tag->state = FINISH; - return grpc::Status::OK; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - EditCandidate(grpc::ServerContext *context, - frr::EditCandidateRequest const *request, - frr::EditCandidateResponse *response) override + void HandleLoadToCandidate(RpcState *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC EditCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - // Create a copy of the candidate. For consistency, we need to - // ensure that either all changes are accepted or none are (in - // the event of an error). - struct nb_config *candidate_tmp = - nb_config_dup(candidate->config); - - auto pvs = request->update(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), - pv.value()) - != 0) { - nb_config_free(candidate_tmp); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to update \"" + pv.path() - + "\""); - } + switch (tag->state) { + case CREATE: { + REQUEST_RPC(LoadToCandidate); + tag->state = PROCESS; } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC LoadToCandidate(candidate_id: %u)", + candidate_id); + + // Request: LoadType type = 2; + int load_type = tag->request.type(); + // Request: DataTree config = 3; + auto config = tag->request.config(); + + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } - pvs = request->delete_(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) - != 0) { - nb_config_free(candidate_tmp); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to remove \"" + pv.path() - + "\""); + struct lyd_node *dnode = + dnode_from_data_tree(&config, true); + if (!dnode) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"), + tag); + tag->state = FINISH; + return; } - } - // No errors, accept all changes. - nb_config_replace(candidate->config, candidate_tmp, false); + struct nb_config *loaded_config = nb_config_new(dnode); + + if (load_type == frr::LoadToCandidateRequest::REPLACE) + nb_config_replace(candidate->config, + loaded_config, false); + else if (nb_config_merge(candidate->config, + loaded_config, false) + != NB_OK) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"), + tag); + tag->state = FINISH; + return; + } - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - LoadToCandidate(grpc::ServerContext *context, - frr::LoadToCandidateRequest const *request, - frr::LoadToCandidateResponse *response) override + void + HandleCommit(RpcState *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - // Request: LoadType type = 2; - int load_type = request->type(); - // Request: DataTree config = 3; - auto config = request->config(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC LoadToCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(Commit); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC Commit(candidate_id: %u)", + candidate_id); + + // Request: Phase phase = 2; + int phase = tag->request.phase(); + // Request: string comment = 3; + const std::string comment = tag->request.comment(); + + // Find candidate configuration. + struct candidate *candidate = + get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } - struct lyd_node *dnode = dnode_from_data_tree(&config, true); - if (!dnode) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"); + int ret = NB_OK; + uint32_t transaction_id = 0; + + // Check for misuse of the two-phase commit protocol. + switch (phase) { + case frr::CommitRequest::PREPARE: + case frr::CommitRequest::ALL: + if (candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + tag->state = FINISH; + return; + } + break; + case frr::CommitRequest::ABORT: + case frr::CommitRequest::APPLY: + if (!candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "no transaction in progress"), + tag); + tag->state = FINISH; + return; + } + break; + default: + break; + } - struct nb_config *loaded_config = nb_config_new(dnode); - if (load_type == frr::LoadToCandidateRequest::REPLACE) - nb_config_replace(candidate->config, loaded_config, - false); - else if (nb_config_merge(candidate->config, loaded_config, - false) - != NB_OK) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"); + // Execute the user request. + switch (phase) { + case frr::CommitRequest::VALIDATE: + ret = nb_candidate_validate(candidate->config); + break; + case frr::CommitRequest::PREPARE: + ret = nb_candidate_commit_prepare( + candidate->config, NB_CLIENT_GRPC, NULL, + comment.c_str(), + &candidate->transaction); + break; + case frr::CommitRequest::ABORT: + nb_candidate_commit_abort( + candidate->transaction); + break; + case frr::CommitRequest::APPLY: + nb_candidate_commit_apply( + candidate->transaction, true, + &transaction_id); + break; + case frr::CommitRequest::ALL: + ret = nb_candidate_commit( + candidate->config, NB_CLIENT_GRPC, NULL, + true, comment.c_str(), &transaction_id); + break; + } - return grpc::Status::OK; - } + // Map northbound error codes to gRPC error codes. + switch (ret) { + case NB_ERR_NO_CHANGES: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::ABORTED, + "No configuration changes detected"), + tag); + tag->state = FINISH; + return; + case NB_ERR_LOCKED: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::UNAVAILABLE, + "There's already a transaction in progress"), + tag); + tag->state = FINISH; + return; + case NB_ERR_VALIDATION: + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Validation error"), + tag); + tag->state = FINISH; + return; + case NB_ERR_RESOURCE: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + RESOURCE_EXHAUSTED, + "Failed do allocate resources"), + tag); + tag->state = FINISH; + return; + case NB_ERR: + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Internal error"), + tag); + tag->state = FINISH; + return; + default: + break; + } - grpc::Status Commit(grpc::ServerContext *context, - frr::CommitRequest const *request, - frr::CommitResponse *response) override - { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - // Request: Phase phase = 2; - int phase = request->phase(); - // Request: string comment = 3; - const std::string comment = request->comment(); + // Response: uint32 transaction_id = 1; + if (transaction_id) + tag->response.set_transaction_id( + transaction_id); - if (nb_dbg_client_grpc) - zlog_debug("received RPC Commit(candidate_id: %u)", - candidate_id); - - // Find candidate configuration. - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - int ret = NB_OK; - uint32_t transaction_id = 0; - - // Check for misuse of the two-phase commit protocol. - switch (phase) { - case frr::CommitRequest::PREPARE: - case frr::CommitRequest::ALL: - if (candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "pending transaction in progress"); - break; - case frr::CommitRequest::ABORT: - case frr::CommitRequest::APPLY: - if (!candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "no transaction in progress"); - break; - default: - break; - } + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; - // Execute the user request. - switch (phase) { - case frr::CommitRequest::VALIDATE: - ret = nb_candidate_validate(candidate->config); - break; - case frr::CommitRequest::PREPARE: - ret = nb_candidate_commit_prepare( - candidate->config, NB_CLIENT_GRPC, NULL, - comment.c_str(), &candidate->transaction); - break; - case frr::CommitRequest::ABORT: - nb_candidate_commit_abort(candidate->transaction); - break; - case frr::CommitRequest::APPLY: - nb_candidate_commit_apply(candidate->transaction, true, - &transaction_id); - break; - case frr::CommitRequest::ALL: - ret = nb_candidate_commit( - candidate->config, NB_CLIENT_GRPC, NULL, true, - comment.c_str(), &transaction_id); break; } - - // Map northbound error codes to gRPC error codes. - switch (ret) { - case NB_ERR_NO_CHANGES: - return grpc::Status( - grpc::StatusCode::ABORTED, - "No configuration changes detected"); - case NB_ERR_LOCKED: - return grpc::Status( - grpc::StatusCode::UNAVAILABLE, - "There's already a transaction in progress"); - case NB_ERR_VALIDATION: - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Validation error"); - case NB_ERR_RESOURCE: - return grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - "Failed do allocate resources"); - case NB_ERR: - return grpc::Status(grpc::StatusCode::INTERNAL, - "Internal error"); - default: - break; + case FINISH: + delete tag; } - - // Response: uint32 transaction_id = 1; - if (transaction_id) - response->set_transaction_id(transaction_id); - - return grpc::Status::OK; } - grpc::Status - ListTransactions(grpc::ServerContext *context, - frr::ListTransactionsRequest const *request, - grpc::ServerWriter - *writer) override + void + HandleListTransactions(RpcState *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC ListTransactions()"); - nb_db_transactions_iterate(list_transactions_cb, writer); - - return grpc::Status::OK; - } + switch (tag->state) { + case CREATE: { + REQUEST_RPC_STREAMING(ListTransactions); + tag->context = new std::list>(); + nb_db_transactions_iterate(list_transactions_cb, + tag->context); + tag->state = PROCESS; + } + case PROCESS: { + auto list = static_cast> *>( + tag->context); + if (list->empty()) { + tag->async_responder.Finish(grpc::Status::OK, + tag); + tag->state = FINISH; + return; + } + auto item = list->back(); - grpc::Status - GetTransaction(grpc::ServerContext *context, - frr::GetTransactionRequest const *request, - frr::GetTransactionResponse *response) override - { - struct nb_config *nb_config; - // Request: uint32 transaction_id = 1; - uint32_t transaction_id = request->transaction_id(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = request->encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = request->with_defaults(); + frr::ListTransactionsResponse response; - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC GetTransaction(transaction_id: %u, encoding: %u)", - transaction_id, encoding); + // Response: uint32 id = 1; + response.set_id(std::get<0>(item)); - // Load configuration from the transactions database. - nb_config = nb_db_transaction_load(transaction_id); - if (!nb_config) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Transaction not found"); + // Response: string client = 2; + response.set_client(std::get<1>(item).c_str()); - // Response: DataTree config = 1; - auto config = response->mutable_config(); - config->set_encoding(encoding); + // Response: string date = 3; + response.set_date(std::get<2>(item).c_str()); - // Dump data using the requested format. - if (data_tree_from_dnode(config, nb_config->dnode, - encoding2lyd_format(encoding), - with_defaults) - != 0) { - nb_config_free(nb_config); - return grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"); - } + // Response: string comment = 4; + response.set_comment(std::get<3>(item).c_str()); - nb_config_free(nb_config); + list->pop_back(); - return grpc::Status::OK; + tag->async_responder.Write(response, tag); + break; + } + case FINISH: + delete static_cast> *>( + tag->context); + delete tag; + } } - grpc::Status LockConfig(grpc::ServerContext *context, - frr::LockConfigRequest const *request, - frr::LockConfigResponse *response) override + void HandleGetTransaction(RpcState *tag) { - if (nb_dbg_client_grpc) - zlog_debug("received RPC LockConfig()"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(GetTransaction); + tag->state = PROCESS; + } + case PROCESS: { + // Request: uint32 transaction_id = 1; + uint32_t transaction_id = tag->request.transaction_id(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC GetTransaction(transaction_id: %u, encoding: %u)", + transaction_id, encoding); + + struct nb_config *nb_config; + + // Load configuration from the transactions database. + nb_config = nb_db_transaction_load(transaction_id); + if (!nb_config) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Transaction not found"), + tag); + tag->state = FINISH; + return; + } - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "running configuration is locked already"); + // Response: DataTree config = 1; + auto config = tag->response.mutable_config(); + config->set_encoding(encoding); - return grpc::Status::OK; + // Dump data using the requested format. + if (data_tree_from_dnode(config, nb_config->dnode, + encoding2lyd_format(encoding), + with_defaults) + != 0) { + nb_config_free(nb_config); + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"), + tag); + tag->state = FINISH; + return; + } + + nb_config_free(nb_config); + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status UnlockConfig(grpc::ServerContext *context, - frr::UnlockConfigRequest const *request, - frr::UnlockConfigResponse *response) override + void HandleLockConfig( + RpcState *tag) { - if (nb_dbg_client_grpc) - zlog_debug("received RPC UnlockConfig()"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(LockConfig); + tag->state = PROCESS; + } + case PROCESS: { + auto rpcState = new RpcState( + this, &NorthboundImpl::HandleLockConfig); + _service->RequestLockConfig( + &rpcState->ctx, &rpcState->request, + &rpcState->responder, _cq, _cq, rpcState); + + if (nb_dbg_client_grpc) + zlog_debug("received RPC LockConfig()"); + + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "running configuration is locked already"), + tag); + tag->state = FINISH; + return; + } + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } + } - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "failed to unlock the running configuration"); + void HandleUnlockConfig(RpcState *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(UnlockConfig); + tag->state = PROCESS; + } + case PROCESS: { + + if (nb_dbg_client_grpc) + zlog_debug("received RPC UnlockConfig()"); + + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "failed to unlock the running configuration"), + tag); + tag->state = FINISH; + return; + } - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status Execute(grpc::ServerContext *context, - frr::ExecuteRequest const *request, - frr::ExecuteResponse *response) override + void + HandleExecute(RpcState *tag) { struct nb_node *nb_node; struct list *input_list; @@ -526,61 +983,100 @@ class NorthboundImpl final : public frr::Northbound::Service struct yang_data *data; const char *xpath; - // Request: string path = 1; - xpath = request->path().c_str(); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(Execute); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: string path = 1; + xpath = tag->request.path().c_str(); + + if (nb_dbg_client_grpc) + zlog_debug("received RPC Execute(path: \"%s\")", + xpath); + + if (tag->request.path().empty()) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Data path is empty"), + tag); + tag->state = FINISH; + return; + } - if (nb_dbg_client_grpc) - zlog_debug("received RPC Execute(path: \"%s\")", xpath); + nb_node = nb_node_find(xpath); + if (!nb_node) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Unknown data path"), + tag); + tag->state = FINISH; + return; + } - if (request->path().empty()) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Data path is empty"); + input_list = yang_data_list_new(); + output_list = yang_data_list_new(); - nb_node = nb_node_find(xpath); - if (!nb_node) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Unknown data path"); + // Read input parameters. + auto input = tag->request.input(); + for (const frr::PathValue &pv : input) { + // Request: repeated PathValue input = 2; + data = yang_data_new(pv.path().c_str(), + pv.value().c_str()); + listnode_add(input_list, data); + } - input_list = yang_data_list_new(); - output_list = yang_data_list_new(); + // Execute callback registered for this XPath. + if (nb_callback_rpc(nb_node, xpath, input_list, + output_list) + != NB_OK) { + flog_warn(EC_LIB_NB_CB_RPC, + "%s: rpc callback failed: %s", + __func__, xpath); + list_delete(&input_list); + list_delete(&output_list); + + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "RPC failed"), + tag); + tag->state = FINISH; + return; + } - // Read input parameters. - auto input = request->input(); - for (const frr::PathValue &pv : input) { - // Request: repeated PathValue input = 2; - data = yang_data_new(pv.path().c_str(), - pv.value().c_str()); - listnode_add(input_list, data); - } + // Process output parameters. + for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { + // Response: repeated PathValue output = 1; + frr::PathValue *pv = tag->response.add_output(); + pv->set_path(data->xpath); + pv->set_value(data->value); + } - // Execute callback registered for this XPath. - if (nb_callback_rpc(nb_node, xpath, input_list, output_list) - != NB_OK) { - flog_warn(EC_LIB_NB_CB_RPC, - "%s: rpc callback failed: %s", __func__, - xpath); + // Release memory. list_delete(&input_list); list_delete(&output_list); - return grpc::Status(grpc::StatusCode::INTERNAL, - "RPC failed"); - } - // Process output parameters. - for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { - // Response: repeated PathValue output = 1; - frr::PathValue *pv = response->add_output(); - pv->set_path(data->xpath); - pv->set_value(data->value); + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; } - - // Release memory. - list_delete(&input_list); - list_delete(&output_list); - - return grpc::Status::OK; } private: + frr::Northbound::AsyncService *_service; + grpc::ServerCompletionQueue *_cq; + struct candidate { uint32_t id; struct nb_config *config; @@ -649,24 +1145,12 @@ class NorthboundImpl final : public frr::Northbound::Service const char *client_name, const char *date, const char *comment) { - grpc::ServerWriter *writer = - static_cast *>(arg); - frr::ListTransactionsResponse response; - - // Response: uint32 id = 1; - response.set_id(transaction_id); - - // Response: string client = 2; - response.set_client(client_name); - - // Response: string date = 3; - response.set_date(date); - // Response: string comment = 4; - response.set_comment(comment); - - writer->Write(response); + auto list = static_cast> *>(arg); + list->push_back(std::make_tuple( + transaction_id, std::string(client_name), + std::string(date), std::string(comment))); } static int data_tree_from_dnode(frr::DataTree *dt, @@ -855,24 +1339,11 @@ static void *grpc_pthread_start(void *arg) { struct frr_pthread *fpt = static_cast(arg); unsigned long *port = static_cast(fpt->data); - NorthboundImpl service; - std::stringstream server_address; frr_pthread_set_name(fpt); - server_address << "0.0.0.0:" << *port; - - grpc::ServerBuilder builder; - builder.AddListeningPort(server_address.str(), - grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr server(builder.BuildAndStart()); - - zlog_notice("gRPC server listening on %s", - server_address.str().c_str()); - - server->Wait(); + NorthboundImpl nb; + nb.Run(*port); return NULL; } -- 2.39.5