diff options
Diffstat (limited to 'lib/northbound_grpc.cpp')
| -rw-r--r-- | lib/northbound_grpc.cpp | 1317 |
1 files changed, 894 insertions, 423 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index fd4101d679..96a11d67bd 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -54,12 +54,70 @@ static const struct frr_pthread_attr attr = { .stop = NULL, }; -class NorthboundImpl final : public frr::Northbound::Service +enum CallStatus { CREATE, PROCESS, FINISH }; + +/* Thanks gooble */ +class RpcStateBase +{ + public: + virtual void doCallback() = 0; +}; + +class NorthboundImpl; + +template <typename Q, typename S> class RpcState : RpcStateBase +{ + public: + RpcState(NorthboundImpl *svc, + void (NorthboundImpl::*cb)(RpcState<Q, S> *)) + : callback(cb), responder(&ctx), async_responder(&ctx), + service(svc){}; + + void doCallback() override + { + (service->*callback)(this); + } + + grpc::ServerContext ctx; + Q request; + S response; + grpc::ServerAsyncResponseWriter<S> responder; + grpc::ServerAsyncWriter<S> async_responder; + + NorthboundImpl *service; + void (NorthboundImpl::*callback)(RpcState<Q, S> *); + + void *context; + CallStatus state = CREATE; +}; + +#define REQUEST_RPC(NAME) \ + do { \ + auto _rpcState = \ + new RpcState<frr::NAME##Request, frr::NAME##Response>( \ + this, &NorthboundImpl::Handle##NAME); \ + _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ + &_rpcState->responder, _cq, _cq, \ + _rpcState); \ + } while (0) + +#define REQUEST_RPC_STREAMING(NAME) \ + do { \ + auto _rpcState = \ + new RpcState<frr::NAME##Request, frr::NAME##Response>( \ + this, &NorthboundImpl::Handle##NAME); \ + _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ + &_rpcState->async_responder, _cq, _cq, \ + _rpcState); \ + } while (0) + +class NorthboundImpl { public: NorthboundImpl(void) { _nextCandidateId = 0; + _service = new frr::Northbound::AsyncService(); } ~NorthboundImpl(void) @@ -70,61 +128,136 @@ class NorthboundImpl final : public frr::Northbound::Service delete_candidate(&it->second); } - grpc::Status - GetCapabilities(grpc::ServerContext *context, - frr::GetCapabilitiesRequest const *request, - frr::GetCapabilitiesResponse *response) override + void Run(unsigned long port) + { + grpc::ServerBuilder builder; + std::stringstream server_address; + + server_address << "0.0.0.0:" << port; + + builder.AddListeningPort(server_address.str(), + grpc::InsecureServerCredentials()); + builder.RegisterService(_service); + + auto cq = builder.AddCompletionQueue(); + _cq = cq.get(); + auto _server = builder.BuildAndStart(); + + /* Schedule all RPC handlers */ + REQUEST_RPC(GetCapabilities); + REQUEST_RPC(CreateCandidate); + REQUEST_RPC(DeleteCandidate); + REQUEST_RPC(UpdateCandidate); + REQUEST_RPC(EditCandidate); + REQUEST_RPC(LoadToCandidate); + REQUEST_RPC(Commit); + REQUEST_RPC(GetTransaction); + REQUEST_RPC(LockConfig); + REQUEST_RPC(UnlockConfig); + REQUEST_RPC(Execute); + REQUEST_RPC_STREAMING(Get); + REQUEST_RPC_STREAMING(ListTransactions); + + zlog_notice("gRPC server listening on %s", + server_address.str().c_str()); + + /* Process inbound RPCs */ + void *tag; + bool ok; + while (true) { + _cq->Next(&tag, &ok); + GPR_ASSERT(ok); + static_cast<RpcStateBase *>(tag)->doCallback(); + tag = nullptr; + } + } + + void HandleGetCapabilities(RpcState<frr::GetCapabilitiesRequest, + frr::GetCapabilitiesResponse> *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC GetCapabilities()"); - // Response: string frr_version = 1; - response->set_frr_version(FRR_VERSION); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(GetCapabilities); + tag->state = PROCESS; + } + case PROCESS: { + + // Response: string frr_version = 1; + tag->response.set_frr_version(FRR_VERSION); - // Response: bool rollback_support = 2; + // Response: bool rollback_support = 2; #ifdef HAVE_CONFIG_ROLLBACKS - response->set_rollback_support(true); + tag->response.set_rollback_support(true); #else - response->set_rollback_support(false); + tag->response.set_rollback_support(false); #endif - // Response: repeated ModuleData supported_modules = 3; - struct yang_module *module; - RB_FOREACH (module, yang_modules, &yang_modules) { - auto m = response->add_supported_modules(); + // Response: repeated ModuleData supported_modules = 3; + struct yang_module *module; + RB_FOREACH (module, yang_modules, &yang_modules) { + auto m = tag->response.add_supported_modules(); - m->set_name(module->name); - if (module->info->rev_size) - m->set_revision(module->info->rev[0].date); - m->set_organization(module->info->org); - } + m->set_name(module->name); + if (module->info->rev_size) + m->set_revision( + module->info->rev[0].date); + m->set_organization(module->info->org); + } - // Response: repeated Encoding supported_encodings = 4; - response->add_supported_encodings(frr::JSON); - response->add_supported_encodings(frr::XML); + // Response: repeated Encoding supported_encodings = 4; + tag->response.add_supported_encodings(frr::JSON); + tag->response.add_supported_encodings(frr::XML); - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status Get(grpc::ServerContext *context, - frr::GetRequest const *request, - grpc::ServerWriter<frr::GetResponse> *writer) override + void HandleGet(RpcState<frr::GetRequest, frr::GetResponse> *tag) { - // Request: DataType type = 1; - int type = request->type(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = request->encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = request->with_defaults(); + switch (tag->state) { + case CREATE: { + auto mypaths = new std::list<std::string>(); + tag->context = mypaths; + auto paths = tag->request.path(); + for (const std::string &path : paths) { + mypaths->push_back(std::string(path)); + } + REQUEST_RPC_STREAMING(Get); + tag->state = PROCESS; + } + case PROCESS: { + // Request: DataType type = 1; + int type = tag->request.type(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", + type, encoding, with_defaults); + + auto mypaths = static_cast<std::list<std::string> *>( + tag->context); + + if (mypaths->empty()) { + tag->async_responder.Finish(grpc::Status::OK, + tag); + tag->state = FINISH; + return; + } - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", - type, encoding, with_defaults); - // Request: repeated string path = 4; - auto paths = request->path(); - for (const std::string &path : paths) { frr::GetResponse response; grpc::Status status; @@ -133,391 +266,715 @@ class NorthboundImpl final : public frr::Northbound::Service // Response: DataTree data = 2; auto *data = response.mutable_data(); - data->set_encoding(request->encoding()); - status = get_path(data, path, type, + data->set_encoding(tag->request.encoding()); + status = get_path(data, mypaths->back().c_str(), type, encoding2lyd_format(encoding), with_defaults); // Something went wrong... - if (!status.ok()) - return status; + if (!status.ok()) { + tag->async_responder.WriteAndFinish( + response, grpc::WriteOptions(), status, + tag); + tag->state = FINISH; + return; + } - writer->Write(response); - } + mypaths->pop_back(); - if (nb_dbg_client_grpc) - zlog_debug("received RPC Get() end"); + tag->async_responder.Write(response, tag); - return grpc::Status::OK; + break; + } + case FINISH: + if (nb_dbg_client_grpc) + zlog_debug("received RPC Get() end"); + + delete static_cast<std::list<std::string> *>( + tag->context); + delete tag; + } } - grpc::Status - CreateCandidate(grpc::ServerContext *context, - frr::CreateCandidateRequest const *request, - frr::CreateCandidateResponse *response) override + void HandleCreateCandidate(RpcState<frr::CreateCandidateRequest, + frr::CreateCandidateResponse> *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC CreateCandidate()"); - struct candidate *candidate = create_candidate(); - if (!candidate) - return grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - "Can't create candidate configuration"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(CreateCandidate); + tag->state = PROCESS; + } + case PROCESS: { + struct candidate *candidate = create_candidate(); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + RESOURCE_EXHAUSTED, + "Can't create candidate configuration"), + tag); + } else { + tag->response.set_candidate_id(candidate->id); + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + } - // Response: uint32 candidate_id = 1; - response->set_candidate_id(candidate->id); + tag->state = FINISH; - return grpc::Status::OK; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - DeleteCandidate(grpc::ServerContext *context, - frr::DeleteCandidateRequest const *request, - frr::DeleteCandidateResponse *response) override + void HandleDeleteCandidate(RpcState<frr::DeleteCandidateRequest, + frr::DeleteCandidateResponse> *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC DeleteCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(DeleteCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC DeleteCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } else { + delete_candidate(candidate); + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + tag->state = FINISH; + return; + } + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } + } - delete_candidate(candidate); + void HandleUpdateCandidate(RpcState<frr::UpdateCandidateRequest, + frr::UpdateCandidateResponse> *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(UpdateCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC UpdateCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + else if (candidate->transaction) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + else if (nb_candidate_update(candidate->config) + != NB_OK) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"), + tag); + + else + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + + tag->state = FINISH; - return grpc::Status::OK; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - UpdateCandidate(grpc::ServerContext *context, - frr::UpdateCandidateRequest const *request, - frr::UpdateCandidateResponse *response) override + void HandleEditCandidate(RpcState<frr::EditCandidateRequest, + frr::EditCandidateResponse> *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(EditCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC EditCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + break; + } - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC UpdateCandidate(candidate_id: %u)", - candidate_id); + struct nb_config *candidate_tmp = + nb_config_dup(candidate->config); + + auto pvs = tag->request.update(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_edit(candidate_tmp->dnode, + pv.path(), pv.value()) + != 0) { + nb_config_free(candidate_tmp); + + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + INVALID_ARGUMENT, + "Failed to update \"" + + pv.path() + + "\""), + tag); + + tag->state = FINISH; + return; + } + } + + pvs = tag->request.delete_(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_delete(candidate_tmp->dnode, + pv.path()) + != 0) { + nb_config_free(candidate_tmp); + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + INVALID_ARGUMENT, + "Failed to remove \"" + + pv.path() + + "\""), + tag); + tag->state = FINISH; + return; + } + } - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + // No errors, accept all changes. + nb_config_replace(candidate->config, candidate_tmp, + false); - if (candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"); + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); - if (nb_candidate_update(candidate->config) != NB_OK) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"); + tag->state = FINISH; - return grpc::Status::OK; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - EditCandidate(grpc::ServerContext *context, - frr::EditCandidateRequest const *request, - frr::EditCandidateResponse *response) override + void HandleLoadToCandidate(RpcState<frr::LoadToCandidateRequest, + frr::LoadToCandidateResponse> *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC EditCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - // Create a copy of the candidate. For consistency, we need to - // ensure that either all changes are accepted or none are (in - // the event of an error). - struct nb_config *candidate_tmp = - nb_config_dup(candidate->config); - - auto pvs = request->update(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), - pv.value()) - != 0) { - nb_config_free(candidate_tmp); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to update \"" + pv.path() - + "\""); - } + switch (tag->state) { + case CREATE: { + REQUEST_RPC(LoadToCandidate); + tag->state = PROCESS; } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC LoadToCandidate(candidate_id: %u)", + candidate_id); + + // Request: LoadType type = 2; + int load_type = tag->request.type(); + // Request: DataTree config = 3; + auto config = tag->request.config(); + + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } - pvs = request->delete_(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) - != 0) { - nb_config_free(candidate_tmp); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to remove \"" + pv.path() - + "\""); + struct lyd_node *dnode = + dnode_from_data_tree(&config, true); + if (!dnode) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"), + tag); + tag->state = FINISH; + return; } - } - // No errors, accept all changes. - nb_config_replace(candidate->config, candidate_tmp, false); + struct nb_config *loaded_config = nb_config_new(dnode); + + if (load_type == frr::LoadToCandidateRequest::REPLACE) + nb_config_replace(candidate->config, + loaded_config, false); + else if (nb_config_merge(candidate->config, + loaded_config, false) + != NB_OK) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"), + tag); + tag->state = FINISH; + return; + } - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status - LoadToCandidate(grpc::ServerContext *context, - frr::LoadToCandidateRequest const *request, - frr::LoadToCandidateResponse *response) override + void + HandleCommit(RpcState<frr::CommitRequest, frr::CommitResponse> *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - // Request: LoadType type = 2; - int load_type = request->type(); - // Request: DataTree config = 3; - auto config = request->config(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC LoadToCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(Commit); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC Commit(candidate_id: %u)", + candidate_id); + + // Request: Phase phase = 2; + int phase = tag->request.phase(); + // Request: string comment = 3; + const std::string comment = tag->request.comment(); + + // Find candidate configuration. + struct candidate *candidate = + get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } - struct lyd_node *dnode = dnode_from_data_tree(&config, true); - if (!dnode) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"); + int ret = NB_OK; + uint32_t transaction_id = 0; + + // Check for misuse of the two-phase commit protocol. + switch (phase) { + case frr::CommitRequest::PREPARE: + case frr::CommitRequest::ALL: + if (candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + tag->state = FINISH; + return; + } + break; + case frr::CommitRequest::ABORT: + case frr::CommitRequest::APPLY: + if (!candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "no transaction in progress"), + tag); + tag->state = FINISH; + return; + } + break; + default: + break; + } - struct nb_config *loaded_config = nb_config_new(dnode); - if (load_type == frr::LoadToCandidateRequest::REPLACE) - nb_config_replace(candidate->config, loaded_config, - false); - else if (nb_config_merge(candidate->config, loaded_config, - false) - != NB_OK) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"); + // Execute the user request. + switch (phase) { + case frr::CommitRequest::VALIDATE: + ret = nb_candidate_validate(candidate->config); + break; + case frr::CommitRequest::PREPARE: + ret = nb_candidate_commit_prepare( + candidate->config, NB_CLIENT_GRPC, NULL, + comment.c_str(), + &candidate->transaction); + break; + case frr::CommitRequest::ABORT: + nb_candidate_commit_abort( + candidate->transaction); + break; + case frr::CommitRequest::APPLY: + nb_candidate_commit_apply( + candidate->transaction, true, + &transaction_id); + break; + case frr::CommitRequest::ALL: + ret = nb_candidate_commit( + candidate->config, NB_CLIENT_GRPC, NULL, + true, comment.c_str(), &transaction_id); + break; + } - return grpc::Status::OK; - } + // Map northbound error codes to gRPC error codes. + switch (ret) { + case NB_ERR_NO_CHANGES: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::ABORTED, + "No configuration changes detected"), + tag); + tag->state = FINISH; + return; + case NB_ERR_LOCKED: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::UNAVAILABLE, + "There's already a transaction in progress"), + tag); + tag->state = FINISH; + return; + case NB_ERR_VALIDATION: + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Validation error"), + tag); + tag->state = FINISH; + return; + case NB_ERR_RESOURCE: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + RESOURCE_EXHAUSTED, + "Failed do allocate resources"), + tag); + tag->state = FINISH; + return; + case NB_ERR: + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Internal error"), + tag); + tag->state = FINISH; + return; + default: + break; + } - grpc::Status Commit(grpc::ServerContext *context, - frr::CommitRequest const *request, - frr::CommitResponse *response) override - { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - // Request: Phase phase = 2; - int phase = request->phase(); - // Request: string comment = 3; - const std::string comment = request->comment(); + // Response: uint32 transaction_id = 1; + if (transaction_id) + tag->response.set_transaction_id( + transaction_id); - if (nb_dbg_client_grpc) - zlog_debug("received RPC Commit(candidate_id: %u)", - candidate_id); - - // Find candidate configuration. - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - int ret = NB_OK; - uint32_t transaction_id = 0; - - // Check for misuse of the two-phase commit protocol. - switch (phase) { - case frr::CommitRequest::PREPARE: - case frr::CommitRequest::ALL: - if (candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "pending transaction in progress"); - break; - case frr::CommitRequest::ABORT: - case frr::CommitRequest::APPLY: - if (!candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "no transaction in progress"); - break; - default: - break; - } + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; - // Execute the user request. - switch (phase) { - case frr::CommitRequest::VALIDATE: - ret = nb_candidate_validate(candidate->config); - break; - case frr::CommitRequest::PREPARE: - ret = nb_candidate_commit_prepare( - candidate->config, NB_CLIENT_GRPC, NULL, - comment.c_str(), &candidate->transaction); - break; - case frr::CommitRequest::ABORT: - nb_candidate_commit_abort(candidate->transaction); - break; - case frr::CommitRequest::APPLY: - nb_candidate_commit_apply(candidate->transaction, true, - &transaction_id); - break; - case frr::CommitRequest::ALL: - ret = nb_candidate_commit( - candidate->config, NB_CLIENT_GRPC, NULL, true, - comment.c_str(), &transaction_id); break; } - - // Map northbound error codes to gRPC error codes. - switch (ret) { - case NB_ERR_NO_CHANGES: - return grpc::Status( - grpc::StatusCode::ABORTED, - "No configuration changes detected"); - case NB_ERR_LOCKED: - return grpc::Status( - grpc::StatusCode::UNAVAILABLE, - "There's already a transaction in progress"); - case NB_ERR_VALIDATION: - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Validation error"); - case NB_ERR_RESOURCE: - return grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - "Failed do allocate resources"); - case NB_ERR: - return grpc::Status(grpc::StatusCode::INTERNAL, - "Internal error"); - default: - break; + case FINISH: + delete tag; } - - // Response: uint32 transaction_id = 1; - if (transaction_id) - response->set_transaction_id(transaction_id); - - return grpc::Status::OK; } - grpc::Status - ListTransactions(grpc::ServerContext *context, - frr::ListTransactionsRequest const *request, - grpc::ServerWriter<frr::ListTransactionsResponse> - *writer) override + void + HandleListTransactions(RpcState<frr::ListTransactionsRequest, + frr::ListTransactionsResponse> *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC ListTransactions()"); - nb_db_transactions_iterate(list_transactions_cb, writer); - - return grpc::Status::OK; - } + switch (tag->state) { + case CREATE: { + REQUEST_RPC_STREAMING(ListTransactions); + tag->context = new std::list<std::tuple< + int, std::string, std::string, std::string>>(); + nb_db_transactions_iterate(list_transactions_cb, + tag->context); + tag->state = PROCESS; + } + case PROCESS: { + auto list = static_cast<std::list<std::tuple< + int, std::string, std::string, std::string>> *>( + tag->context); + if (list->empty()) { + tag->async_responder.Finish(grpc::Status::OK, + tag); + tag->state = FINISH; + return; + } + auto item = list->back(); - grpc::Status - GetTransaction(grpc::ServerContext *context, - frr::GetTransactionRequest const *request, - frr::GetTransactionResponse *response) override - { - struct nb_config *nb_config; - // Request: uint32 transaction_id = 1; - uint32_t transaction_id = request->transaction_id(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = request->encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = request->with_defaults(); + frr::ListTransactionsResponse response; - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC GetTransaction(transaction_id: %u, encoding: %u)", - transaction_id, encoding); + // Response: uint32 id = 1; + response.set_id(std::get<0>(item)); - // Load configuration from the transactions database. - nb_config = nb_db_transaction_load(transaction_id); - if (!nb_config) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Transaction not found"); + // Response: string client = 2; + response.set_client(std::get<1>(item).c_str()); - // Response: DataTree config = 1; - auto config = response->mutable_config(); - config->set_encoding(encoding); + // Response: string date = 3; + response.set_date(std::get<2>(item).c_str()); - // Dump data using the requested format. - if (data_tree_from_dnode(config, nb_config->dnode, - encoding2lyd_format(encoding), - with_defaults) - != 0) { - nb_config_free(nb_config); - return grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"); - } + // Response: string comment = 4; + response.set_comment(std::get<3>(item).c_str()); - nb_config_free(nb_config); + list->pop_back(); - return grpc::Status::OK; + tag->async_responder.Write(response, tag); + break; + } + case FINISH: + delete static_cast<std::list<std::tuple< + int, std::string, std::string, std::string>> *>( + tag->context); + delete tag; + } } - grpc::Status LockConfig(grpc::ServerContext *context, - frr::LockConfigRequest const *request, - frr::LockConfigResponse *response) override + void HandleGetTransaction(RpcState<frr::GetTransactionRequest, + frr::GetTransactionResponse> *tag) { - if (nb_dbg_client_grpc) - zlog_debug("received RPC LockConfig()"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(GetTransaction); + tag->state = PROCESS; + } + case PROCESS: { + // Request: uint32 transaction_id = 1; + uint32_t transaction_id = tag->request.transaction_id(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC GetTransaction(transaction_id: %u, encoding: %u)", + transaction_id, encoding); + + struct nb_config *nb_config; + + // Load configuration from the transactions database. + nb_config = nb_db_transaction_load(transaction_id); + if (!nb_config) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Transaction not found"), + tag); + tag->state = FINISH; + return; + } - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "running configuration is locked already"); + // Response: DataTree config = 1; + auto config = tag->response.mutable_config(); + config->set_encoding(encoding); - return grpc::Status::OK; + // Dump data using the requested format. + if (data_tree_from_dnode(config, nb_config->dnode, + encoding2lyd_format(encoding), + with_defaults) + != 0) { + nb_config_free(nb_config); + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"), + tag); + tag->state = FINISH; + return; + } + + nb_config_free(nb_config); + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status UnlockConfig(grpc::ServerContext *context, - frr::UnlockConfigRequest const *request, - frr::UnlockConfigResponse *response) override + void HandleLockConfig( + RpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) { - if (nb_dbg_client_grpc) - zlog_debug("received RPC UnlockConfig()"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(LockConfig); + tag->state = PROCESS; + } + case PROCESS: { + auto rpcState = new RpcState<frr::LockConfigRequest, + frr::LockConfigResponse>( + this, &NorthboundImpl::HandleLockConfig); + _service->RequestLockConfig( + &rpcState->ctx, &rpcState->request, + &rpcState->responder, _cq, _cq, rpcState); + + if (nb_dbg_client_grpc) + zlog_debug("received RPC LockConfig()"); + + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "running configuration is locked already"), + tag); + tag->state = FINISH; + return; + } + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } + } - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "failed to unlock the running configuration"); + void HandleUnlockConfig(RpcState<frr::UnlockConfigRequest, + frr::UnlockConfigResponse> *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(UnlockConfig); + tag->state = PROCESS; + } + case PROCESS: { + + if (nb_dbg_client_grpc) + zlog_debug("received RPC UnlockConfig()"); + + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "failed to unlock the running configuration"), + tag); + tag->state = FINISH; + return; + } - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status Execute(grpc::ServerContext *context, - frr::ExecuteRequest const *request, - frr::ExecuteResponse *response) override + void + HandleExecute(RpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) { struct nb_node *nb_node; struct list *input_list; @@ -526,61 +983,100 @@ class NorthboundImpl final : public frr::Northbound::Service struct yang_data *data; const char *xpath; - // Request: string path = 1; - xpath = request->path().c_str(); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(Execute); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: string path = 1; + xpath = tag->request.path().c_str(); + + if (nb_dbg_client_grpc) + zlog_debug("received RPC Execute(path: \"%s\")", + xpath); + + if (tag->request.path().empty()) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Data path is empty"), + tag); + tag->state = FINISH; + return; + } - if (nb_dbg_client_grpc) - zlog_debug("received RPC Execute(path: \"%s\")", xpath); + nb_node = nb_node_find(xpath); + if (!nb_node) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Unknown data path"), + tag); + tag->state = FINISH; + return; + } - if (request->path().empty()) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Data path is empty"); + input_list = yang_data_list_new(); + output_list = yang_data_list_new(); - nb_node = nb_node_find(xpath); - if (!nb_node) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Unknown data path"); + // Read input parameters. + auto input = tag->request.input(); + for (const frr::PathValue &pv : input) { + // Request: repeated PathValue input = 2; + data = yang_data_new(pv.path().c_str(), + pv.value().c_str()); + listnode_add(input_list, data); + } - input_list = yang_data_list_new(); - output_list = yang_data_list_new(); + // Execute callback registered for this XPath. + if (nb_callback_rpc(nb_node, xpath, input_list, + output_list) + != NB_OK) { + flog_warn(EC_LIB_NB_CB_RPC, + "%s: rpc callback failed: %s", + __func__, xpath); + list_delete(&input_list); + list_delete(&output_list); + + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "RPC failed"), + tag); + tag->state = FINISH; + return; + } - // Read input parameters. - auto input = request->input(); - for (const frr::PathValue &pv : input) { - // Request: repeated PathValue input = 2; - data = yang_data_new(pv.path().c_str(), - pv.value().c_str()); - listnode_add(input_list, data); - } + // Process output parameters. + for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { + // Response: repeated PathValue output = 1; + frr::PathValue *pv = tag->response.add_output(); + pv->set_path(data->xpath); + pv->set_value(data->value); + } - // Execute callback registered for this XPath. - if (nb_callback_rpc(nb_node, xpath, input_list, output_list) - != NB_OK) { - flog_warn(EC_LIB_NB_CB_RPC, - "%s: rpc callback failed: %s", __func__, - xpath); + // Release memory. list_delete(&input_list); list_delete(&output_list); - return grpc::Status(grpc::StatusCode::INTERNAL, - "RPC failed"); - } - // Process output parameters. - for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { - // Response: repeated PathValue output = 1; - frr::PathValue *pv = response->add_output(); - pv->set_path(data->xpath); - pv->set_value(data->value); + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; } - - // Release memory. - list_delete(&input_list); - list_delete(&output_list); - - return grpc::Status::OK; } private: + frr::Northbound::AsyncService *_service; + grpc::ServerCompletionQueue *_cq; + struct candidate { uint32_t id; struct nb_config *config; @@ -649,24 +1145,12 @@ class NorthboundImpl final : public frr::Northbound::Service const char *client_name, const char *date, const char *comment) { - grpc::ServerWriter<frr::ListTransactionsResponse> *writer = - static_cast<grpc::ServerWriter< - frr::ListTransactionsResponse> *>(arg); - frr::ListTransactionsResponse response; - - // Response: uint32 id = 1; - response.set_id(transaction_id); - - // Response: string client = 2; - response.set_client(client_name); - - // Response: string date = 3; - response.set_date(date); - // Response: string comment = 4; - response.set_comment(comment); - - writer->Write(response); + auto list = static_cast<std::list<std::tuple< + int, std::string, std::string, std::string>> *>(arg); + list->push_back(std::make_tuple( + transaction_id, std::string(client_name), + std::string(date), std::string(comment))); } static int data_tree_from_dnode(frr::DataTree *dt, @@ -855,24 +1339,11 @@ static void *grpc_pthread_start(void *arg) { struct frr_pthread *fpt = static_cast<frr_pthread *>(arg); unsigned long *port = static_cast<unsigned long *>(fpt->data); - NorthboundImpl service; - std::stringstream server_address; frr_pthread_set_name(fpt); - server_address << "0.0.0.0:" << *port; - - grpc::ServerBuilder builder; - builder.AddListeningPort(server_address.str(), - grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); - - zlog_notice("gRPC server listening on %s", - server_address.str().c_str()); - - server->Wait(); + NorthboundImpl nb; + nb.Run(*port); return NULL; } |
