From 48c93061f581dfea6beda4c9934db2bdb22bc774 Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Sun, 20 Feb 2022 20:30:52 -0500 Subject: [PATCH] lib: grpc: rework RPC handlers improve code clarity - split NewRpcState object into 2, a Unary and a Streaming variant, which then allows for the next. - move all state machine details inside these new state objects - use a template arg to allow for Streaming state tracking object creation and deletion w/o requiring this in each specific RPC hander. - Code is more rugged by design now. Thanks to Rafael Zalamena for the cleanup ideas/motivation. Signed-off-by: Christian Hopps --- lib/northbound_grpc.cpp | 812 +++++++++++++++++----------------------- 1 file changed, 337 insertions(+), 475 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 0a458b6262..dda1756214 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -39,6 +39,11 @@ #define GRPC_DEFAULT_PORT 50051 + +// ------------------------------------------------------ +// File Local Variables +// ------------------------------------------------------ + /* * NOTE: we can't use the FRR debugging infrastructure here since it uses * atomics and C++ has a different atomics API. Enable gRPC debugging @@ -121,112 +126,79 @@ class Candidates std::map _cdb; }; +/* + * RpcStateBase is the common base class used to track a gRPC RPC. + */ class RpcStateBase { public: - virtual ~RpcStateBase() = default; - virtual CallState doCallback() = 0; virtual void do_request(::frr::Northbound::AsyncService *service, ::grpc::ServerCompletionQueue *cq, bool no_copy) = 0; -}; -/* - * The RPC state class is used to track the execution of an RPC. - */ -template class NewRpcState : RpcStateBase -{ - typedef void (frr::Northbound::AsyncService::*reqfunc_t)( - ::grpc::ServerContext *, Q *, - ::grpc::ServerAsyncResponseWriter *, - ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, - void *); - typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( - ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter *, - ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, - void *); + RpcStateBase(const char *name) : name(name){}; - public: - NewRpcState(Candidates *cdb, reqfunc_t rfunc, - void (*cb)(NewRpcState *), const char *name) - : requestf(rfunc), callback(cb), responder(&ctx), - async_responder(&ctx), name(name), cdb(cdb){}; - NewRpcState(Candidates *cdb, reqsfunc_t rfunc, - void (*cb)(NewRpcState *), const char *name) - : requestsf(rfunc), callback(cb), responder(&ctx), - async_responder(&ctx), name(name), cdb(cdb){}; - - CallState doCallback() override + virtual ~RpcStateBase() = default; + + CallState get_state() const { - CallState enter_state = this->state; - CallState new_state; - if (enter_state == FINISH) { - grpc_debug("%s RPC FINISH -> DELETED", name); - new_state = FINISH; - } else { - grpc_debug("%s RPC: %s -> PROCESS", name, - call_states[this->state]); - new_state = PROCESS; - } + return state; + } + + bool is_initial_process() const + { + /* Will always be true for Unary */ + return entered_state == CREATE; + } + + // Returns "more" status, if false caller can delete + bool run(frr::Northbound::AsyncService *service, + grpc::ServerCompletionQueue *cq) + { + /* + * We enter in either CREATE or MORE state, and transition to + * PROCESS state. + */ + this->entered_state = this->state; + this->state = PROCESS; + grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name, + call_states[this->entered_state], + call_states[this->state]); /* - * We are either in state CREATE, MORE or FINISH. If CREATE or - * MORE move back to PROCESS, otherwise we are cleaning up - * (FINISH) so leave it in that state. Run the callback on the - * main threadmaster/pthread; and wait for expected transition - * from main thread. If transition is to FINISH->DELETED. - * delete us. - * - * We update the state prior to scheduling the callback which - * may then update the state in the master pthread. Then we - * obtain the lock in the condvar-check-loop as the callback - * will be modifying updating the state value. + * We schedule the callback on the main pthread, and wait for + * the state to transition out of the PROCESS state. The new + * state will either be MORE or FINISH. It will always be FINISH + * for Unary RPCs. */ - this->state = new_state; thread_add_event(main_master, c_callback, (void *)this, 0, NULL); + pthread_mutex_lock(&this->cmux); - while (this->state == new_state) + while (this->state == PROCESS) pthread_cond_wait(&this->cond, &this->cmux); pthread_mutex_unlock(&this->cmux); - if (enter_state == FINISH) - assert(this->state == DELETED); - - if (this->state == DELETED) { - grpc_debug("%s RPC: -> [DELETED]", name); - delete this; - return DELETED; - } - return this->state; - } - - void do_request(::frr::Northbound::AsyncService *service, - ::grpc::ServerCompletionQueue *cq, - bool no_copy) override - { - grpc_debug("%s, posting a request for: %s", __func__, name); - if (requestf) { - NewRpcState *copy = - no_copy ? this - : new NewRpcState(cdb, requestf, - callback, name); - (service->*requestf)(©->ctx, ©->request, - ©->responder, cq, cq, copy); - } else { - NewRpcState *copy = - no_copy ? this - : new NewRpcState(cdb, requestsf, - callback, name); - (service->*requestsf)(©->ctx, ©->request, - ©->async_responder, cq, cq, - copy); + grpc_debug("%s RPC in %s on grpc-io-thread", name, + call_states[this->state]); + + if (this->state == FINISH) { + /* + * Server is done (FINISH) so prep to receive a new + * request of this type. We could do this earlier but + * that would mean we could be handling multiple same + * type requests in parallel without limit. + */ + this->do_request(service, cq, false); } + return true; } + protected: + virtual CallState run_mainthread(struct thread *thread) = 0; static void c_callback(struct thread *thread) { - auto _tag = static_cast *>(thread->arg); + auto _tag = static_cast(thread->arg); /* * We hold the lock until the callback finishes and has updated * _tag->state, then we signal done and release. @@ -234,11 +206,12 @@ template class NewRpcState : RpcStateBase pthread_mutex_lock(&_tag->cmux); CallState enter_state = _tag->state; - grpc_debug("%s RPC running on main thread", _tag->name); + grpc_debug("%s RPC: running %s on main thread", _tag->name, + call_states[enter_state]); - _tag->callback(_tag); + _tag->state = _tag->run_mainthread(thread); - grpc_debug("%s RPC: %s -> %s", _tag->name, + grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name, call_states[enter_state], call_states[_tag->state]); pthread_cond_signal(&_tag->cond); @@ -246,23 +219,118 @@ template class NewRpcState : RpcStateBase return; } - const char *name; grpc::ServerContext ctx; + pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + CallState state = CREATE; + CallState entered_state; + + public: + const char *name; +}; + +/* + * The UnaryRpcState class is used to track the execution of a Unary RPC. + * + * Template Args: + * Q - the request type for a given unary RPC + * S - the response type for a given unary RPC + */ +template class UnaryRpcState : public RpcStateBase +{ + public: + typedef void (frr::Northbound::AsyncService::*reqfunc_t)( + ::grpc::ServerContext *, Q *, + ::grpc::ServerAsyncResponseWriter *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); + + UnaryRpcState(Candidates *cdb, reqfunc_t rfunc, + grpc::Status (*cb)(UnaryRpcState *), + const char *name) + : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb), + responder(&ctx){}; + + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override + { + grpc_debug("%s, posting a request for: %s", __func__, name); + auto copy = no_copy ? this + : new UnaryRpcState(cdb, requestf, callback, + name); + (service->*requestf)(©->ctx, ©->request, + ©->responder, cq, cq, copy); + } + + CallState run_mainthread(struct thread *thread) override + { + // Unary RPC are always finished, see "Unary" :) + grpc::Status status = this->callback(this); + responder.Finish(response, status, this); + return FINISH; + } + + Candidates *cdb; + Q request; S response; grpc::ServerAsyncResponseWriter responder; - grpc::ServerAsyncWriter async_responder; - Candidates *cdb; - void (*callback)(NewRpcState *); + grpc::Status (*callback)(UnaryRpcState *); reqfunc_t requestf = NULL; - reqsfunc_t requestsf = NULL; +}; - pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; - pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - void *context = 0; +/* + * The StreamRpcState class is used to track the execution of a Streaming RPC. + * + * Template Args: + * Q - the request type for a given streaming RPC + * S - the response type for a given streaming RPC + * X - the type used to track the streaming state + */ +template +class StreamRpcState : public RpcStateBase +{ + public: + typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( + ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); - CallState state = CREATE; + StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState *), + const char *name) + : RpcStateBase(name), requestsf(rfunc), callback(cb), + async_responder(&ctx){}; + + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override + { + grpc_debug("%s, posting a request for: %s", __func__, name); + auto copy = + no_copy ? this + : new StreamRpcState(requestsf, callback, name); + (service->*requestsf)(©->ctx, ©->request, + ©->async_responder, cq, cq, copy); + } + + CallState run_mainthread(struct thread *thread) override + { + if (this->callback(this)) + return MORE; + else + return FINISH; + } + + Q request; + S response; + grpc::ServerAsyncWriter async_responder; + + bool (*callback)(StreamRpcState *); + reqsfunc_t requestsf = NULL; + + X context; }; // ------------------------------------------------------ @@ -481,15 +549,11 @@ static grpc::Status get_path(frr::DataTree *dt, const std::string &path, // RPC Callback Functions: run on main thread // ------------------------------------------------------ -void HandleUnaryGetCapabilities(NewRpcState *tag) +grpc::Status HandleUnaryGetCapabilities( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Response: string frr_version = 1; tag->response.set_frr_version(FRR_VERSION); @@ -515,30 +579,24 @@ void HandleUnaryGetCapabilities(NewRpcStateresponse.add_supported_encodings(frr::JSON); tag->response.add_supported_encodings(frr::XML); - /* Should we do this in the async process call? */ - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - - /* Indicate we are done. */ - tag->state = FINISH; + return grpc::Status::OK; } -void HandleStreamingGet(NewRpcState *tag) -{ - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); +// Define the context variable type for this streaming handler +typedef std::list GetContextType; - if (tag->state == FINISH) { - delete static_cast *>(tag->context); - tag->state = DELETED; - return; - } +bool HandleStreamingGet( + StreamRpcState *tag) +{ + grpc_debug("%s: entered", __func__); - if (!tag->context) { - /* Creating, first time called for this RPC */ - auto mypaths = new std::list(); - tag->context = mypaths; + auto mypathps = &tag->context; + if (tag->is_initial_process()) { + // Fill our context container first time through + grpc_debug("%s: initialize streaming state", __func__); auto paths = tag->request.path(); for (const std::string &path : paths) { - mypaths->push_back(std::string(path)); + mypathps->push_back(std::string(path)); } } @@ -549,11 +607,9 @@ void HandleStreamingGet(NewRpcState *tag) // Request: bool with_defaults = 3; bool with_defaults = tag->request.with_defaults(); - auto mypathps = static_cast *>(tag->context); if (mypathps->empty()) { tag->async_responder.Finish(grpc::Status::OK, tag); - tag->state = FINISH; - return; + return false; } frr::GetResponse response; @@ -571,85 +627,57 @@ void HandleStreamingGet(NewRpcState *tag) if (!status.ok()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), status, tag); - tag->state = FINISH; - return; + return false; } mypathps->pop_back(); if (mypathps->empty()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), grpc::Status::OK, tag); - tag->state = FINISH; + return false; } else { tag->async_responder.Write(response, tag); - tag->state = MORE; + return true; } } -void HandleUnaryCreateCandidate(NewRpcState *tag) +grpc::Status HandleUnaryCreateCandidate( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); struct candidate *candidate = tag->cdb->create_candidate(); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, - "Can't create candidate configuration"), - tag); - } else { - tag->response.set_candidate_id(candidate->id); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - - tag->state = FINISH; + if (!candidate) + return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + "Can't create candidate configuration"); + tag->response.set_candidate_id(candidate->id); + return grpc::Status::OK; } -void HandleUnaryDeleteCandidate(NewRpcState *tag) +grpc::Status HandleUnaryDeleteCandidate( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); - if (!tag->cdb->contains(candidate_id)) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - } else { - tag->cdb->delete_candidate(candidate_id); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (!tag->cdb->contains(candidate_id)) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); + tag->cdb->delete_candidate(candidate_id); + return grpc::Status::OK; } -void HandleUnaryUpdateCandidate(NewRpcState *tag) +grpc::Status HandleUnaryUpdateCandidate( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); @@ -657,58 +685,33 @@ void HandleUnaryUpdateCandidate(NewRpcStatecdb->get_candidate(candidate_id); if (!candidate) - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - else if (candidate->transaction) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - else if (nb_candidate_update(candidate->config) != NB_OK) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"), - tag); - - else - tag->responder.Finish(tag->response, grpc::Status::OK, tag); + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); + if (candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"); + if (nb_candidate_update(candidate->config) != NB_OK) + return grpc::Status(grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryEditCandidate( - NewRpcState *tag) +grpc::Status HandleUnaryEditCandidate( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); struct nb_config *candidate_tmp = nb_config_dup(candidate->config); @@ -718,15 +721,9 @@ void HandleUnaryEditCandidate( pv.value().c_str()) != 0) { nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Failed to update \"" + pv.path() - + "\""), - tag); - - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to update \"" + pv.path() + + "\""); } } @@ -734,36 +731,23 @@ void HandleUnaryEditCandidate( for (const frr::PathValue &pv : pvs) { if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Failed to remove \"" + pv.path() - + "\""), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to remove \"" + pv.path() + + "\""); } } // No errors, accept all changes. nb_config_replace(candidate->config, candidate_tmp, false); - - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryLoadToCandidate(NewRpcState *tag) +grpc::Status HandleUnaryLoadToCandidate( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); @@ -773,59 +757,31 @@ void HandleUnaryLoadToCandidate(NewRpcStaterequest.config(); - struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); struct lyd_node *dnode = dnode_from_data_tree(&config, true); - if (!dnode) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"), - tag); - tag->state = FINISH; - return; - } + if (!dnode) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"); struct nb_config *loaded_config = nb_config_new(dnode); - if (load_type == frr::LoadToCandidateRequest::REPLACE) nb_config_replace(candidate->config, loaded_config, false); - else if (nb_config_merge(candidate->config, loaded_config, false) - != NB_OK) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"), - tag); - tag->state = FINISH; - return; - } + else if (nb_config_merge(candidate->config, loaded_config, false) != + NB_OK) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryCommit( - NewRpcState *tag) +grpc::Status +HandleUnaryCommit(UnaryRpcState *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); @@ -839,15 +795,9 @@ void HandleUnaryCommit( // Find candidate configuration. struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); int ret = NB_OK; uint32_t transaction_id = 0; @@ -856,29 +806,17 @@ void HandleUnaryCommit( switch (phase) { case frr::CommitRequest::PREPARE: case frr::CommitRequest::ALL: - if (candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - tag->state = FINISH; - return; - } + if (candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"); break; case frr::CommitRequest::ABORT: case frr::CommitRequest::APPLY: - if (!candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "no transaction in progress"), - tag); - tag->state = FINISH; - return; - } + if (!candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "no transaction in progress"); break; default: break; @@ -958,53 +896,30 @@ void HandleUnaryCommit( if (strlen(errmsg) > 0) tag->response.set_error_message(errmsg); - tag->responder.Finish(tag->response, status, tag); - tag->state = FINISH; + return status; } -void HandleUnaryLockConfig( - NewRpcState *tag) +grpc::Status HandleUnaryLockConfig( + UnaryRpcState *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, - "running configuration is locked already"), - tag); - } else { - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, + "running configuration is locked already"); + return grpc::Status::OK; } -void HandleUnaryUnlockConfig( - NewRpcState *tag) +grpc::Status HandleUnaryUnlockConfig( + UnaryRpcState *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "failed to unlock the running configuration"), - tag); - } else { - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "failed to unlock the running configuration"); + return grpc::Status::OK; } static void list_transactions_cb(void *arg, int transaction_id, @@ -1018,45 +933,34 @@ static void list_transactions_cb(void *arg, int transaction_id, std::string(date), std::string(comment))); } -void HandleStreamingListTransactions( - NewRpcState - *tag) -{ - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - delete static_cast> *>( - tag->context); - tag->state = DELETED; - return; - } +// Define the context variable type for this streaming handler +typedef std::list> + ListTransactionsContextType; - if (!tag->context) { - /* Creating, first time called for this RPC */ - auto new_list = - new std::list>(); - tag->context = new_list; - nb_db_transactions_iterate(list_transactions_cb, tag->context); +bool HandleStreamingListTransactions( + StreamRpcState *tag) +{ + grpc_debug("%s: entered", __func__); - new_list->push_back(std::make_tuple( + auto list = &tag->context; + if (tag->is_initial_process()) { + grpc_debug("%s: initialize streaming state", __func__); + // Fill our context container first time through + nb_db_transactions_iterate(list_transactions_cb, list); + list->push_back(std::make_tuple( 0xFFFF, std::string("fake client"), std::string("fake date"), std::string("fake comment"))); - new_list->push_back( - std::make_tuple(0xFFFE, std::string("fake client2"), - std::string("fake date"), - std::string("fake comment2"))); + list->push_back(std::make_tuple(0xFFFE, + std::string("fake client2"), + std::string("fake date"), + std::string("fake comment2"))); } - auto list = static_cast> *>( - tag->context); - if (list->empty()) { tag->async_responder.Finish(grpc::Status::OK, tag); - tag->state = FINISH; - return; + return false; } auto item = list->back(); @@ -1079,22 +983,18 @@ void HandleStreamingListTransactions( if (list->empty()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), grpc::Status::OK, tag); - tag->state = FINISH; + return false; } else { tag->async_responder.Write(response, tag); - tag->state = MORE; + return true; } } -void HandleUnaryGetTransaction(NewRpcState *tag) +grpc::Status HandleUnaryGetTransaction( + UnaryRpcState + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Request: uint32 transaction_id = 1; uint32_t transaction_id = tag->request.transaction_id(); @@ -1110,15 +1010,9 @@ void HandleUnaryGetTransaction(NewRpcStateresponder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Transaction not found"), - tag); - tag->state = FINISH; - return; - } + if (!nb_config) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Transaction not found"); // Response: DataTree config = 1; auto config = tag->response.mutable_config(); @@ -1129,29 +1023,19 @@ void HandleUnaryGetTransaction(NewRpcStateresponder.Finish(tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"); } nb_config_free(nb_config); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryExecute( - NewRpcState *tag) +grpc::Status HandleUnaryExecute( + UnaryRpcState *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); struct nb_node *nb_node; struct list *input_list; @@ -1166,26 +1050,14 @@ void HandleUnaryExecute( grpc_debug("%s(path: \"%s\")", __func__, xpath); - if (tag->request.path().empty()) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Data path is empty"), - tag); - tag->state = FINISH; - return; - } + if (tag->request.path().empty()) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Data path is empty"); nb_node = nb_node_find(xpath); - if (!nb_node) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Unknown data path"), - tag); - tag->state = FINISH; - return; - } + if (!nb_node) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Unknown data path"); input_list = yang_data_list_new(); output_list = yang_data_list_new(); @@ -1207,12 +1079,7 @@ void HandleUnaryExecute( list_delete(&input_list); list_delete(&output_list); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"); } // Process output parameters. @@ -1227,8 +1094,7 @@ void HandleUnaryExecute( list_delete(&input_list); list_delete(&output_list); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } // ------------------------------------------------------ @@ -1238,18 +1104,19 @@ void HandleUnaryExecute( #define REQUEST_NEWRPC(NAME, cdb) \ do { \ - auto _rpcState = new NewRpcState( \ + auto _rpcState = new UnaryRpcState( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleUnary##NAME, #NAME); \ _rpcState->do_request(&service, cq.get(), true); \ } while (0) -#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ +#define REQUEST_NEWRPC_STREAMING(NAME) \ do { \ - auto _rpcState = new NewRpcState( \ - (cdb), &frr::Northbound::AsyncService::Request##NAME, \ + auto _rpcState = new StreamRpcState( \ + &frr::Northbound::AsyncService::Request##NAME, \ &HandleStreaming##NAME, #NAME); \ _rpcState->do_request(&service, cq.get(), true); \ } while (0) @@ -1288,7 +1155,7 @@ static void *grpc_pthread_start(void *arg) grpc_running = true; - /* Schedule all RPC handlers */ + /* Schedule unary RPC handlers */ REQUEST_NEWRPC(GetCapabilities, NULL); REQUEST_NEWRPC(CreateCandidate, &candidates); REQUEST_NEWRPC(DeleteCandidate, &candidates); @@ -1300,8 +1167,10 @@ static void *grpc_pthread_start(void *arg) REQUEST_NEWRPC(LockConfig, NULL); REQUEST_NEWRPC(UnlockConfig, NULL); REQUEST_NEWRPC(Execute, NULL); - REQUEST_NEWRPC_STREAMING(Get, NULL); - REQUEST_NEWRPC_STREAMING(ListTransactions, NULL); + + /* Schedule streaming RPC handlers */ + REQUEST_NEWRPC_STREAMING(Get); + REQUEST_NEWRPC_STREAMING(ListTransactions); zlog_notice("gRPC server listening on %s", server_address.str().c_str()); @@ -1309,7 +1178,7 @@ static void *grpc_pthread_start(void *arg) /* Process inbound RPCs */ bool ok; void *tag; - while (grpc_running) { + while (true) { if (!cq->Next(&tag, &ok)) { grpc_debug("%s: CQ empty exiting", __func__); break; @@ -1318,25 +1187,18 @@ static void *grpc_pthread_start(void *arg) grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, ok); - if (!ok || !grpc_running) { + if (!ok) { delete static_cast(tag); break; } RpcStateBase *rpc = static_cast(tag); - CallState state = rpc->doCallback(); - grpc_debug("%s: callback returned RPC State: %s", __func__, - call_states[state]); - - /* - * Our side is done (FINISH) receive new requests of this type - * We could do this earlier but that would mean we could be - * handling multiple same type requests in parallel. We expect - * to be called back once more in the FINISH state (from the - * user indicating Finish() for cleanup. - */ - if (state == FINISH && grpc_running) - rpc->do_request(&service, cq.get(), false); + if (rpc->get_state() != FINISH) + rpc->run(&service, cq.get()); + else { + grpc_debug("%s RPC FINISH -> [delete]", rpc->name); + delete rpc; + } } /* This was probably done for us to get here, but let's be safe */ -- 2.39.5