diff options
Diffstat (limited to 'lib/northbound_grpc.cpp')
| -rw-r--r-- | lib/northbound_grpc.cpp | 935 |
1 files changed, 426 insertions, 509 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 34bb1e4986..dda1756214 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -1,7 +1,7 @@ // +// Copyright (c) 2021-2022, LabN Consulting, L.L.C // Copyright (C) 2019 NetDEF, Inc. // Renato Westphal -// Copyright (c) 2021, LabN Consulting, L.L.C // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the Free @@ -39,6 +39,11 @@ #define GRPC_DEFAULT_PORT 50051 + +// ------------------------------------------------------ +// File Local Variables +// ------------------------------------------------------ + /* * NOTE: we can't use the FRR debugging infrastructure here since it uses * atomics and C++ has a different atomics API. Enable gRPC debugging @@ -50,6 +55,8 @@ static struct thread_master *main_master; static struct frr_pthread *fpt; +static bool grpc_running; + #define grpc_debug(...) \ do { \ if (nb_dbg_client_grpc) \ @@ -76,7 +83,7 @@ class Candidates { // Delete candidates. for (auto it = _cdb.begin(); it != _cdb.end(); it++) - delete_candidate(&it->second); + delete_candidate(it->first); } struct candidate *create_candidate(void) @@ -92,123 +99,106 @@ class Candidates return c; } - void delete_candidate(struct candidate *c) + bool contains(uint64_t candidate_id) + { + return _cdb.count(candidate_id) > 0; + } + + void delete_candidate(uint64_t candidate_id) { + struct candidate *c = &_cdb[candidate_id]; char errmsg[BUFSIZ] = {0}; - _cdb.erase(c->id); nb_config_free(c->config); if (c->transaction) nb_candidate_commit_abort(c->transaction, errmsg, sizeof(errmsg)); + _cdb.erase(c->id); } - struct candidate *get_candidate(uint32_t id) + struct candidate *get_candidate(uint64_t id) { return _cdb.count(id) == 0 ? NULL : &_cdb[id]; } private: uint64_t _next_id = 0; - std::map<uint32_t, struct candidate> _cdb; + std::map<uint64_t, struct candidate> _cdb; }; +/* + * RpcStateBase is the common base class used to track a gRPC RPC. + */ class RpcStateBase { public: - virtual CallState doCallback() = 0; virtual void do_request(::frr::Northbound::AsyncService *service, - ::grpc::ServerCompletionQueue *cq) = 0; -}; + ::grpc::ServerCompletionQueue *cq, + bool no_copy) = 0; -/* - * The RPC state class is used to track the execution of an RPC. - */ -template <typename Q, typename S> class NewRpcState : RpcStateBase -{ - typedef void (frr::Northbound::AsyncService::*reqfunc_t)( - ::grpc::ServerContext *, Q *, - ::grpc::ServerAsyncResponseWriter<S> *, - ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, - void *); - typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( - ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, - ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, - void *); + RpcStateBase(const char *name) : name(name){}; - public: - NewRpcState(Candidates *cdb, reqfunc_t rfunc, - void (*cb)(NewRpcState<Q, S> *), const char *name) - : requestf(rfunc), callback(cb), responder(&ctx), - async_responder(&ctx), name(name), cdb(cdb){}; - NewRpcState(Candidates *cdb, reqsfunc_t rfunc, - void (*cb)(NewRpcState<Q, S> *), const char *name) - : requestsf(rfunc), callback(cb), responder(&ctx), - async_responder(&ctx), name(name), cdb(cdb){}; - - CallState doCallback() override + virtual ~RpcStateBase() = default; + + CallState get_state() const { - CallState enter_state = this->state; - CallState new_state; - if (enter_state == FINISH) { - grpc_debug("%s RPC FINISH -> DELETED", name); - new_state = FINISH; - } else { - grpc_debug("%s RPC: %s -> PROCESS", name, - call_states[this->state]); - new_state = PROCESS; - } + return state; + } + + bool is_initial_process() const + { + /* Will always be true for Unary */ + return entered_state == CREATE; + } + + // Returns "more" status, if false caller can delete + bool run(frr::Northbound::AsyncService *service, + grpc::ServerCompletionQueue *cq) + { + /* + * We enter in either CREATE or MORE state, and transition to + * PROCESS state. + */ + this->entered_state = this->state; + this->state = PROCESS; + grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name, + call_states[this->entered_state], + call_states[this->state]); /* - * We are either in state CREATE, MORE or FINISH. If CREATE or - * MORE move back to PROCESS, otherwise we are cleaning up - * (FINISH) so leave it in that state. Run the callback on the - * main threadmaster/pthread; and wait for expected transition - * from main thread. If transition is to FINISH->DELETED. - * delete us. - * - * We update the state prior to scheduling the callback which - * may then update the state in the master pthread. Then we - * obtain the lock in the condvar-check-loop as the callback - * will be modifying updating the state value. + * We schedule the callback on the main pthread, and wait for + * the state to transition out of the PROCESS state. The new + * state will either be MORE or FINISH. It will always be FINISH + * for Unary RPCs. */ - this->state = new_state; thread_add_event(main_master, c_callback, (void *)this, 0, NULL); + pthread_mutex_lock(&this->cmux); - while (this->state == new_state) + while (this->state == PROCESS) pthread_cond_wait(&this->cond, &this->cmux); pthread_mutex_unlock(&this->cmux); - if (this->state == DELETED) { - grpc_debug("%s RPC: -> [DELETED]", name); - delete this; - return DELETED; - } - return this->state; - } - - void do_request(::frr::Northbound::AsyncService *service, - ::grpc::ServerCompletionQueue *cq) override - { - grpc_debug("%s, posting a request for: %s", __func__, name); - if (requestf) { - NewRpcState<Q, S> *copy = - new NewRpcState(cdb, requestf, callback, name); - (service->*requestf)(©->ctx, ©->request, - ©->responder, cq, cq, copy); - } else { - NewRpcState<Q, S> *copy = - new NewRpcState(cdb, requestsf, callback, name); - (service->*requestsf)(©->ctx, ©->request, - ©->async_responder, cq, cq, - copy); + grpc_debug("%s RPC in %s on grpc-io-thread", name, + call_states[this->state]); + + if (this->state == FINISH) { + /* + * Server is done (FINISH) so prep to receive a new + * request of this type. We could do this earlier but + * that would mean we could be handling multiple same + * type requests in parallel without limit. + */ + this->do_request(service, cq, false); } + return true; } + protected: + virtual CallState run_mainthread(struct thread *thread) = 0; static void c_callback(struct thread *thread) { - auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg); + auto _tag = static_cast<RpcStateBase *>(thread->arg); /* * We hold the lock until the callback finishes and has updated * _tag->state, then we signal done and release. @@ -216,36 +206,131 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase pthread_mutex_lock(&_tag->cmux); CallState enter_state = _tag->state; - grpc_debug("%s RPC running on main thread", _tag->name); + grpc_debug("%s RPC: running %s on main thread", _tag->name, + call_states[enter_state]); - _tag->callback(_tag); + _tag->state = _tag->run_mainthread(thread); - grpc_debug("%s RPC: %s -> %s", _tag->name, + grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name, call_states[enter_state], call_states[_tag->state]); pthread_cond_signal(&_tag->cond); pthread_mutex_unlock(&_tag->cmux); return; } - NewRpcState<Q, S> *orig; - const char *name; grpc::ServerContext ctx; + pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + CallState state = CREATE; + CallState entered_state; + + public: + const char *name; +}; + +/* + * The UnaryRpcState class is used to track the execution of a Unary RPC. + * + * Template Args: + * Q - the request type for a given unary RPC + * S - the response type for a given unary RPC + */ +template <typename Q, typename S> class UnaryRpcState : public RpcStateBase +{ + public: + typedef void (frr::Northbound::AsyncService::*reqfunc_t)( + ::grpc::ServerContext *, Q *, + ::grpc::ServerAsyncResponseWriter<S> *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); + + UnaryRpcState(Candidates *cdb, reqfunc_t rfunc, + grpc::Status (*cb)(UnaryRpcState<Q, S> *), + const char *name) + : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb), + responder(&ctx){}; + + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override + { + grpc_debug("%s, posting a request for: %s", __func__, name); + auto copy = no_copy ? this + : new UnaryRpcState(cdb, requestf, callback, + name); + (service->*requestf)(©->ctx, ©->request, + ©->responder, cq, cq, copy); + } + + CallState run_mainthread(struct thread *thread) override + { + // Unary RPC are always finished, see "Unary" :) + grpc::Status status = this->callback(this); + responder.Finish(response, status, this); + return FINISH; + } + + Candidates *cdb; + Q request; S response; grpc::ServerAsyncResponseWriter<S> responder; - grpc::ServerAsyncWriter<S> async_responder; - Candidates *cdb; - void (*callback)(NewRpcState<Q, S> *); - reqfunc_t requestf; - reqsfunc_t requestsf; + grpc::Status (*callback)(UnaryRpcState<Q, S> *); + reqfunc_t requestf = NULL; +}; - pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; - pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - void *context; +/* + * The StreamRpcState class is used to track the execution of a Streaming RPC. + * + * Template Args: + * Q - the request type for a given streaming RPC + * S - the response type for a given streaming RPC + * X - the type used to track the streaming state + */ +template <typename Q, typename S, typename X> +class StreamRpcState : public RpcStateBase +{ + public: + typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( + ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); - CallState state = CREATE; + StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *), + const char *name) + : RpcStateBase(name), requestsf(rfunc), callback(cb), + async_responder(&ctx){}; + + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override + { + grpc_debug("%s, posting a request for: %s", __func__, name); + auto copy = + no_copy ? this + : new StreamRpcState(requestsf, callback, name); + (service->*requestsf)(©->ctx, ©->request, + ©->async_responder, cq, cq, copy); + } + + CallState run_mainthread(struct thread *thread) override + { + if (this->callback(this)) + return MORE; + else + return FINISH; + } + + Q request; + S response; + grpc::ServerAsyncWriter<S> async_responder; + + bool (*callback)(StreamRpcState<Q, S, X> *); + reqsfunc_t requestsf = NULL; + + X context; }; // ------------------------------------------------------ @@ -268,10 +353,10 @@ static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) } static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, - const std::string &value) + const char *value) { - LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), - value.c_str(), LYD_NEW_PATH_UPDATE, &dnode); + LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value, + LYD_NEW_PATH_UPDATE, &dnode); if (err != LY_SUCCESS) { flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s", __func__, ly_errmsg(ly_native_ctx)); @@ -464,15 +549,11 @@ static grpc::Status get_path(frr::DataTree *dt, const std::string &path, // RPC Callback Functions: run on main thread // ------------------------------------------------------ -void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest, - frr::GetCapabilitiesResponse> *tag) +grpc::Status HandleUnaryGetCapabilities( + UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Response: string frr_version = 1; tag->response.set_frr_version(FRR_VERSION); @@ -498,30 +579,24 @@ void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest, tag->response.add_supported_encodings(frr::JSON); tag->response.add_supported_encodings(frr::XML); - /* Should we do this in the async process call? */ - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - - /* Indicate we are done. */ - tag->state = FINISH; + return grpc::Status::OK; } -void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) -{ - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); +// Define the context variable type for this streaming handler +typedef std::list<std::string> GetContextType; - if (tag->state == FINISH) { - delete static_cast<std::list<std::string> *>(tag->context); - tag->state = DELETED; - return; - } +bool HandleStreamingGet( + StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag) +{ + grpc_debug("%s: entered", __func__); - if (!tag->context) { - /* Creating, first time called for this RPC */ - auto mypaths = new std::list<std::string>(); - tag->context = mypaths; + auto mypathps = &tag->context; + if (tag->is_initial_process()) { + // Fill our context container first time through + grpc_debug("%s: initialize streaming state", __func__); auto paths = tag->request.path(); for (const std::string &path : paths) { - mypaths->push_back(std::string(path)); + mypathps->push_back(std::string(path)); } } @@ -532,11 +607,9 @@ void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) // Request: bool with_defaults = 3; bool with_defaults = tag->request.with_defaults(); - auto mypathps = static_cast<std::list<std::string> *>(tag->context); if (mypathps->empty()) { tag->async_responder.Finish(grpc::Status::OK, tag); - tag->state = FINISH; - return; + return false; } frr::GetResponse response; @@ -554,86 +627,57 @@ void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) if (!status.ok()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), status, tag); - tag->state = FINISH; - return; + return false; } mypathps->pop_back(); if (mypathps->empty()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), grpc::Status::OK, tag); - tag->state = FINISH; + return false; } else { tag->async_responder.Write(response, tag); - tag->state = MORE; + return true; } } -void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest, - frr::CreateCandidateResponse> *tag) +grpc::Status HandleUnaryCreateCandidate( + UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); struct candidate *candidate = tag->cdb->create_candidate(); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, - "Can't create candidate configuration"), - tag); - } else { - tag->response.set_candidate_id(candidate->id); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - - tag->state = FINISH; + if (!candidate) + return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + "Can't create candidate configuration"); + tag->response.set_candidate_id(candidate->id); + return grpc::Status::OK; } -void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest, - frr::DeleteCandidateResponse> *tag) +grpc::Status HandleUnaryDeleteCandidate( + UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + grpc_debug("%s: entered", __func__); - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } - - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); - struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - } else { - tag->cdb->delete_candidate(candidate); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (!tag->cdb->contains(candidate_id)) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); + tag->cdb->delete_candidate(candidate_id); + return grpc::Status::OK; } -void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest, - frr::UpdateCandidateResponse> *tag) +grpc::Status HandleUnaryUpdateCandidate( + UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); @@ -641,76 +685,45 @@ void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest, struct candidate *candidate = tag->cdb->get_candidate(candidate_id); if (!candidate) - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - else if (candidate->transaction) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - else if (nb_candidate_update(candidate->config) != NB_OK) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"), - tag); - - else - tag->responder.Finish(tag->response, grpc::Status::OK, tag); + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); + if (candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"); + if (nb_candidate_update(candidate->config) != NB_OK) + return grpc::Status(grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryEditCandidate( - NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag) +grpc::Status HandleUnaryEditCandidate( + UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + grpc_debug("%s: entered", __func__); - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } - - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); struct nb_config *candidate_tmp = nb_config_dup(candidate->config); auto pvs = tag->request.update(); for (const frr::PathValue &pv : pvs) { - if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value()) - != 0) { + if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), + pv.value().c_str()) != 0) { nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Failed to update \"" + pv.path() - + "\""), - tag); - - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to update \"" + pv.path() + + "\""); } } @@ -718,36 +731,23 @@ void HandleUnaryEditCandidate( for (const frr::PathValue &pv : pvs) { if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Failed to remove \"" + pv.path() - + "\""), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to remove \"" + pv.path() + + "\""); } } // No errors, accept all changes. nb_config_replace(candidate->config, candidate_tmp, false); - - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest, - frr::LoadToCandidateResponse> *tag) +grpc::Status HandleUnaryLoadToCandidate( + UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); @@ -757,59 +757,31 @@ void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest, // Request: DataTree config = 3; auto config = tag->request.config(); - struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); struct lyd_node *dnode = dnode_from_data_tree(&config, true); - if (!dnode) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"), - tag); - tag->state = FINISH; - return; - } + if (!dnode) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"); struct nb_config *loaded_config = nb_config_new(dnode); - if (load_type == frr::LoadToCandidateRequest::REPLACE) nb_config_replace(candidate->config, loaded_config, false); - else if (nb_config_merge(candidate->config, loaded_config, false) - != NB_OK) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"), - tag); - tag->state = FINISH; - return; - } + else if (nb_config_merge(candidate->config, loaded_config, false) != + NB_OK) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryCommit( - NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag) +grpc::Status +HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); @@ -823,15 +795,9 @@ void HandleUnaryCommit( // Find candidate configuration. struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); int ret = NB_OK; uint32_t transaction_id = 0; @@ -840,29 +806,17 @@ void HandleUnaryCommit( switch (phase) { case frr::CommitRequest::PREPARE: case frr::CommitRequest::ALL: - if (candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - tag->state = FINISH; - return; - } + if (candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"); break; case frr::CommitRequest::ABORT: case frr::CommitRequest::APPLY: - if (!candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "no transaction in progress"), - tag); - tag->state = FINISH; - return; - } + if (!candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "no transaction in progress"); break; default: break; @@ -942,53 +896,30 @@ void HandleUnaryCommit( if (strlen(errmsg) > 0) tag->response.set_error_message(errmsg); - tag->responder.Finish(tag->response, status, tag); - tag->state = FINISH; + return status; } -void HandleUnaryLockConfig( - NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) +grpc::Status HandleUnaryLockConfig( + UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + grpc_debug("%s: entered", __func__); - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } - - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, - "running configuration is locked already"), - tag); - } else { - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, + "running configuration is locked already"); + return grpc::Status::OK; } -void HandleUnaryUnlockConfig( - NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) +grpc::Status HandleUnaryUnlockConfig( + UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + grpc_debug("%s: entered", __func__); - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } - - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "failed to unlock the running configuration"), - tag); - } else { - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "failed to unlock the running configuration"); + return grpc::Status::OK; } static void list_transactions_cb(void *arg, int transaction_id, @@ -1002,45 +933,34 @@ static void list_transactions_cb(void *arg, int transaction_id, std::string(date), std::string(comment))); } -void HandleStreamingListTransactions( - NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse> - *tag) -{ - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - delete static_cast<std::list<std::tuple< - int, std::string, std::string, std::string>> *>( - tag->context); - tag->state = DELETED; - return; - } - - if (!tag->context) { - /* Creating, first time called for this RPC */ - auto new_list = - new std::list<std::tuple<int, std::string, std::string, - std::string>>(); - tag->context = new_list; - nb_db_transactions_iterate(list_transactions_cb, tag->context); +// Define the context variable type for this streaming handler +typedef std::list<std::tuple<int, std::string, std::string, std::string>> + ListTransactionsContextType; - new_list->push_back(std::make_tuple( +bool HandleStreamingListTransactions( + StreamRpcState<frr::ListTransactionsRequest, + frr::ListTransactionsResponse, + ListTransactionsContextType> *tag) +{ + grpc_debug("%s: entered", __func__); + + auto list = &tag->context; + if (tag->is_initial_process()) { + grpc_debug("%s: initialize streaming state", __func__); + // Fill our context container first time through + nb_db_transactions_iterate(list_transactions_cb, list); + list->push_back(std::make_tuple( 0xFFFF, std::string("fake client"), std::string("fake date"), std::string("fake comment"))); - new_list->push_back( - std::make_tuple(0xFFFE, std::string("fake client2"), - std::string("fake date"), - std::string("fake comment2"))); + list->push_back(std::make_tuple(0xFFFE, + std::string("fake client2"), + std::string("fake date"), + std::string("fake comment2"))); } - auto list = static_cast<std::list< - std::tuple<int, std::string, std::string, std::string>> *>( - tag->context); - if (list->empty()) { tag->async_responder.Finish(grpc::Status::OK, tag); - tag->state = FINISH; - return; + return false; } auto item = list->back(); @@ -1063,22 +983,18 @@ void HandleStreamingListTransactions( if (list->empty()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), grpc::Status::OK, tag); - tag->state = FINISH; + return false; } else { tag->async_responder.Write(response, tag); - tag->state = MORE; + return true; } } -void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, - frr::GetTransactionResponse> *tag) +grpc::Status HandleUnaryGetTransaction( + UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Request: uint32 transaction_id = 1; uint32_t transaction_id = tag->request.transaction_id(); @@ -1094,15 +1010,9 @@ void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, // Load configuration from the transactions database. nb_config = nb_db_transaction_load(transaction_id); - if (!nb_config) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Transaction not found"), - tag); - tag->state = FINISH; - return; - } + if (!nb_config) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Transaction not found"); // Response: DataTree config = 1; auto config = tag->response.mutable_config(); @@ -1113,29 +1023,19 @@ void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, encoding2lyd_format(encoding), with_defaults) != 0) { nb_config_free(nb_config); - tag->responder.Finish(tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"); } nb_config_free(nb_config); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryExecute( - NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) +grpc::Status HandleUnaryExecute( + UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); struct nb_node *nb_node; struct list *input_list; @@ -1150,26 +1050,14 @@ void HandleUnaryExecute( grpc_debug("%s(path: \"%s\")", __func__, xpath); - if (tag->request.path().empty()) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Data path is empty"), - tag); - tag->state = FINISH; - return; - } + if (tag->request.path().empty()) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Data path is empty"); nb_node = nb_node_find(xpath); - if (!nb_node) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Unknown data path"), - tag); - tag->state = FINISH; - return; - } + if (!nb_node) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Unknown data path"); input_list = yang_data_list_new(); output_list = yang_data_list_new(); @@ -1191,12 +1079,7 @@ void HandleUnaryExecute( list_delete(&input_list); list_delete(&output_list); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"); } // Process output parameters. @@ -1211,8 +1094,7 @@ void HandleUnaryExecute( list_delete(&input_list); list_delete(&output_list); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } // ------------------------------------------------------ @@ -1222,20 +1104,21 @@ void HandleUnaryExecute( #define REQUEST_NEWRPC(NAME, cdb) \ do { \ - auto _rpcState = new NewRpcState<frr::NAME##Request, \ - frr::NAME##Response>( \ + auto _rpcState = new UnaryRpcState<frr::NAME##Request, \ + frr::NAME##Response>( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleUnary##NAME, #NAME); \ - _rpcState->do_request(service, s_cq); \ + _rpcState->do_request(&service, cq.get(), true); \ } while (0) -#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ +#define REQUEST_NEWRPC_STREAMING(NAME) \ do { \ - auto _rpcState = new NewRpcState<frr::NAME##Request, \ - frr::NAME##Response>( \ - (cdb), &frr::Northbound::AsyncService::Request##NAME, \ + auto _rpcState = new StreamRpcState<frr::NAME##Request, \ + frr::NAME##Response, \ + NAME##ContextType>( \ + &frr::Northbound::AsyncService::Request##NAME, \ &HandleStreaming##NAME, #NAME); \ - _rpcState->do_request(service, s_cq); \ + _rpcState->do_request(&service, cq.get(), true); \ } while (0) struct grpc_pthread_attr { @@ -1244,8 +1127,8 @@ struct grpc_pthread_attr { }; // Capture these objects so we can try to shut down cleanly -static std::unique_ptr<grpc::Server> s_server; -static grpc::ServerCompletionQueue *s_cq; +static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER; +static grpc::Server *s_server; static void *grpc_pthread_start(void *arg) { @@ -1255,20 +1138,24 @@ static void *grpc_pthread_start(void *arg) Candidates candidates; grpc::ServerBuilder builder; std::stringstream server_address; - frr::Northbound::AsyncService *service = - new frr::Northbound::AsyncService(); + frr::Northbound::AsyncService service; frr_pthread_set_name(fpt); server_address << "0.0.0.0:" << port; builder.AddListeningPort(server_address.str(), grpc::InsecureServerCredentials()); - builder.RegisterService(service); - auto cq = builder.AddCompletionQueue(); - s_cq = cq.get(); - s_server = builder.BuildAndStart(); + builder.RegisterService(&service); + builder.AddChannelArgument( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + std::unique_ptr<grpc::ServerCompletionQueue> cq = + builder.AddCompletionQueue(); + std::unique_ptr<grpc::Server> server = builder.BuildAndStart(); + s_server = server.get(); + + grpc_running = true; - /* Schedule all RPC handlers */ + /* Schedule unary RPC handlers */ REQUEST_NEWRPC(GetCapabilities, NULL); REQUEST_NEWRPC(CreateCandidate, &candidates); REQUEST_NEWRPC(DeleteCandidate, &candidates); @@ -1280,40 +1167,60 @@ static void *grpc_pthread_start(void *arg) REQUEST_NEWRPC(LockConfig, NULL); REQUEST_NEWRPC(UnlockConfig, NULL); REQUEST_NEWRPC(Execute, NULL); - REQUEST_NEWRPC_STREAMING(Get, NULL); - REQUEST_NEWRPC_STREAMING(ListTransactions, NULL); + + /* Schedule streaming RPC handlers */ + REQUEST_NEWRPC_STREAMING(Get); + REQUEST_NEWRPC_STREAMING(ListTransactions); zlog_notice("gRPC server listening on %s", server_address.str().c_str()); /* Process inbound RPCs */ + bool ok; + void *tag; while (true) { - void *tag; - bool ok; - - s_cq->Next(&tag, &ok); - if (!ok) + if (!cq->Next(&tag, &ok)) { + grpc_debug("%s: CQ empty exiting", __func__); break; + } + + grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, + ok); - grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__, - tag, ok); + if (!ok) { + delete static_cast<RpcStateBase *>(tag); + break; + } RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); - CallState state = rpc->doCallback(); - grpc_debug("%s: Callback returned RPC State: %s", __func__, - call_states[state]); + if (rpc->get_state() != FINISH) + rpc->run(&service, cq.get()); + else { + grpc_debug("%s RPC FINISH -> [delete]", rpc->name); + delete rpc; + } + } - /* - * Our side is done (FINISH) receive new requests of this type - * We could do this earlier but that would mean we could be - * handling multiple same type requests in parallel. We expect - * to be called back once more in the FINISH state (from the - * user indicating Finish() for cleanup. - */ - if (state == FINISH) - rpc->do_request(service, s_cq); + /* This was probably done for us to get here, but let's be safe */ + pthread_mutex_lock(&s_server_lock); + grpc_running = false; + if (s_server) { + grpc_debug("%s: shutdown server and CQ", __func__); + server->Shutdown(); + s_server = NULL; + } + pthread_mutex_unlock(&s_server_lock); + + grpc_debug("%s: shutting down CQ", __func__); + cq->Shutdown(); + + grpc_debug("%s: draining the CQ", __func__); + while (cq->Next(&tag, &ok)) { + grpc_debug("%s: drain tag %p", __func__, tag); + delete static_cast<RpcStateBase *>(tag); } + zlog_info("%s: exiting from grpc pthread", __func__); return NULL; } @@ -1325,6 +1232,8 @@ static int frr_grpc_init(uint port) .stop = NULL, }; + grpc_debug("%s: entered", __func__); + fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); fpt->data = reinterpret_cast<void *>((intptr_t)port); @@ -1340,23 +1249,31 @@ static int frr_grpc_init(uint port) static int frr_grpc_finish(void) { - // Shutdown the grpc server - if (s_server) { - s_server->Shutdown(); - s_cq->Shutdown(); + grpc_debug("%s: entered", __func__); - // And drain the queue - void *ignore; - bool ok; + if (!fpt) + return 0; - while (s_cq->Next(&ignore, &ok)) - ; + /* + * Shut the server down here in main thread. This will cause the wait on + * the completion queue (cq.Next()) to exit and cleanup everything else. + */ + pthread_mutex_lock(&s_server_lock); + grpc_running = false; + if (s_server) { + grpc_debug("%s: shutdown server", __func__); + s_server->Shutdown(); + s_server = NULL; } + pthread_mutex_unlock(&s_server_lock); - if (fpt) { - pthread_join(fpt->thread, NULL); - frr_pthread_destroy(fpt); - } + grpc_debug("%s: joining and destroy grpc thread", __func__); + pthread_join(fpt->thread, NULL); + frr_pthread_destroy(fpt); + + // Fix protobuf 'memory leaks' during shutdown. + // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs + google::protobuf::ShutdownProtobufLibrary(); return 0; } |
