]> git.puffer.fish Git - mirror/frr.git/commitdiff
lib: grpc: rework RPC handlers improve code clarity 10739/head
authorChristian Hopps <chopps@labn.net>
Mon, 21 Feb 2022 01:30:52 +0000 (20:30 -0500)
committerChristian Hopps <chopps@labn.net>
Mon, 14 Mar 2022 19:54:25 +0000 (15:54 -0400)
- split NewRpcState object into 2, a Unary and a Streaming variant, which
  then allows for the next.
- move all state machine details inside these new state objects
  - use a template arg to allow for Streaming state tracking object
    creation and deletion w/o requiring this in each specific RPC
    hander.
- Code is more rugged by design now.

Thanks to Rafael Zalamena <rzalamena@opensourcerouting.org> for the cleanup
ideas/motivation.

Signed-off-by: Christian Hopps <chopps@labn.net>
lib/northbound_grpc.cpp

index 0a458b6262e985d26ebd011184c72bc15848606c..dda1756214dcc1ec4796228c51ec031163fc6809 100644 (file)
 
 #define GRPC_DEFAULT_PORT 50051
 
+
+// ------------------------------------------------------
+//                 File Local Variables
+// ------------------------------------------------------
+
 /*
  * NOTE: we can't use the FRR debugging infrastructure here since it uses
  * atomics and C++ has a different atomics API. Enable gRPC debugging
@@ -121,112 +126,79 @@ class Candidates
        std::map<uint64_t, struct candidate> _cdb;
 };
 
+/*
+ * RpcStateBase is the common base class used to track a gRPC RPC.
+ */
 class RpcStateBase
 {
       public:
-       virtual ~RpcStateBase() = default;
-       virtual CallState doCallback() = 0;
        virtual void do_request(::frr::Northbound::AsyncService *service,
                                ::grpc::ServerCompletionQueue *cq,
                                bool no_copy) = 0;
-};
 
-/*
- * The RPC state class is used to track the execution of an RPC.
- */
-template <typename Q, typename S> class NewRpcState : RpcStateBase
-{
-       typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
-               ::grpc::ServerContext *, Q *,
-               ::grpc::ServerAsyncResponseWriter<S> *,
-               ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
-               void *);
-       typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
-               ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
-               ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
-               void *);
+       RpcStateBase(const char *name) : name(name){};
 
-      public:
-       NewRpcState(Candidates *cdb, reqfunc_t rfunc,
-                   void (*cb)(NewRpcState<Q, S> *), const char *name)
-           : requestf(rfunc), callback(cb), responder(&ctx),
-             async_responder(&ctx), name(name), cdb(cdb){};
-       NewRpcState(Candidates *cdb, reqsfunc_t rfunc,
-                   void (*cb)(NewRpcState<Q, S> *), const char *name)
-           : requestsf(rfunc), callback(cb), responder(&ctx),
-             async_responder(&ctx), name(name), cdb(cdb){};
-
-       CallState doCallback() override
+       virtual ~RpcStateBase() = default;
+
+       CallState get_state() const
        {
-               CallState enter_state = this->state;
-               CallState new_state;
-               if (enter_state == FINISH) {
-                       grpc_debug("%s RPC FINISH -> DELETED", name);
-                       new_state = FINISH;
-               } else {
-                       grpc_debug("%s RPC: %s -> PROCESS", name,
-                                  call_states[this->state]);
-                       new_state = PROCESS;
-               }
+               return state;
+       }
+
+       bool is_initial_process() const
+       {
+               /* Will always be true for Unary */
+               return entered_state == CREATE;
+       }
+
+       // Returns "more" status, if false caller can delete
+       bool run(frr::Northbound::AsyncService *service,
+                grpc::ServerCompletionQueue *cq)
+       {
+               /*
+                * We enter in either CREATE or MORE state, and transition to
+                * PROCESS state.
+                */
+               this->entered_state = this->state;
+               this->state = PROCESS;
+               grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name,
+                          call_states[this->entered_state],
+                          call_states[this->state]);
                /*
-                * We are either in state CREATE, MORE or FINISH. If CREATE or
-                * MORE move back to PROCESS, otherwise we are cleaning up
-                * (FINISH) so leave it in that state. Run the callback on the
-                * main threadmaster/pthread; and wait for expected transition
-                * from main thread. If transition is to FINISH->DELETED.
-                * delete us.
-                *
-                * We update the state prior to scheduling the callback which
-                * may then update the state in the master pthread. Then we
-                * obtain the lock in the condvar-check-loop as the callback
-                * will be modifying updating the state value.
+                * We schedule the callback on the main pthread, and wait for
+                * the state to transition out of the PROCESS state. The new
+                * state will either be MORE or FINISH. It will always be FINISH
+                * for Unary RPCs.
                 */
-               this->state = new_state;
                thread_add_event(main_master, c_callback, (void *)this, 0,
                                 NULL);
+
                pthread_mutex_lock(&this->cmux);
-               while (this->state == new_state)
+               while (this->state == PROCESS)
                        pthread_cond_wait(&this->cond, &this->cmux);
                pthread_mutex_unlock(&this->cmux);
 
-               if (enter_state == FINISH)
-                       assert(this->state == DELETED);
-
-               if (this->state == DELETED) {
-                       grpc_debug("%s RPC: -> [DELETED]", name);
-                       delete this;
-                       return DELETED;
-               }
-               return this->state;
-       }
-
-       void do_request(::frr::Northbound::AsyncService *service,
-                       ::grpc::ServerCompletionQueue *cq,
-                       bool no_copy) override
-       {
-               grpc_debug("%s, posting a request for: %s", __func__, name);
-               if (requestf) {
-                       NewRpcState<Q, S> *copy =
-                               no_copy ? this
-                                       : new NewRpcState(cdb, requestf,
-                                                         callback, name);
-                       (service->*requestf)(&copy->ctx, &copy->request,
-                                            &copy->responder, cq, cq, copy);
-               } else {
-                       NewRpcState<Q, S> *copy =
-                               no_copy ? this
-                                       : new NewRpcState(cdb, requestsf,
-                                                         callback, name);
-                       (service->*requestsf)(&copy->ctx, &copy->request,
-                                             &copy->async_responder, cq, cq,
-                                             copy);
+               grpc_debug("%s RPC in %s on grpc-io-thread", name,
+                          call_states[this->state]);
+
+               if (this->state == FINISH) {
+                       /*
+                        * Server is done (FINISH) so prep to receive a new
+                        * request of this type. We could do this earlier but
+                        * that would mean we could be handling multiple same
+                        * type requests in parallel without limit.
+                        */
+                       this->do_request(service, cq, false);
                }
+               return true;
        }
 
+      protected:
+       virtual CallState run_mainthread(struct thread *thread) = 0;
 
        static void c_callback(struct thread *thread)
        {
-               auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg);
+               auto _tag = static_cast<RpcStateBase *>(thread->arg);
                /*
                 * We hold the lock until the callback finishes and has updated
                 * _tag->state, then we signal done and release.
@@ -234,11 +206,12 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
                pthread_mutex_lock(&_tag->cmux);
 
                CallState enter_state = _tag->state;
-               grpc_debug("%s RPC running on main thread", _tag->name);
+               grpc_debug("%s RPC: running %s on main thread", _tag->name,
+                          call_states[enter_state]);
 
-               _tag->callback(_tag);
+               _tag->state = _tag->run_mainthread(thread);
 
-               grpc_debug("%s RPC: %s -> %s", _tag->name,
+               grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name,
                           call_states[enter_state], call_states[_tag->state]);
 
                pthread_cond_signal(&_tag->cond);
@@ -246,23 +219,118 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
                return;
        }
 
-       const char *name;
        grpc::ServerContext ctx;
+       pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
+       pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+       CallState state = CREATE;
+       CallState entered_state;
+
+      public:
+       const char *name;
+};
+
+/*
+ * The UnaryRpcState class is used to track the execution of a Unary RPC.
+ *
+ * Template Args:
+ *     Q - the request type for a given unary RPC
+ *     S - the response type for a given unary RPC
+ */
+template <typename Q, typename S> class UnaryRpcState : public RpcStateBase
+{
+      public:
+       typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
+               ::grpc::ServerContext *, Q *,
+               ::grpc::ServerAsyncResponseWriter<S> *,
+               ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
+               void *);
+
+       UnaryRpcState(Candidates *cdb, reqfunc_t rfunc,
+                     grpc::Status (*cb)(UnaryRpcState<Q, S> *),
+                     const char *name)
+           : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb),
+             responder(&ctx){};
+
+       void do_request(::frr::Northbound::AsyncService *service,
+                       ::grpc::ServerCompletionQueue *cq,
+                       bool no_copy) override
+       {
+               grpc_debug("%s, posting a request for: %s", __func__, name);
+               auto copy = no_copy ? this
+                                   : new UnaryRpcState(cdb, requestf, callback,
+                                                       name);
+               (service->*requestf)(&copy->ctx, &copy->request,
+                                    &copy->responder, cq, cq, copy);
+       }
+
+       CallState run_mainthread(struct thread *thread) override
+       {
+               // Unary RPC are always finished, see "Unary" :)
+               grpc::Status status = this->callback(this);
+               responder.Finish(response, status, this);
+               return FINISH;
+       }
+
+       Candidates *cdb;
+
        Q request;
        S response;
        grpc::ServerAsyncResponseWriter<S> responder;
-       grpc::ServerAsyncWriter<S> async_responder;
 
-       Candidates *cdb;
-       void (*callback)(NewRpcState<Q, S> *);
+       grpc::Status (*callback)(UnaryRpcState<Q, S> *);
        reqfunc_t requestf = NULL;
-       reqsfunc_t requestsf = NULL;
+};
 
-       pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
-       pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-       void *context = 0;
+/*
+ * The StreamRpcState class is used to track the execution of a Streaming RPC.
+ *
+ * Template Args:
+ *     Q - the request type for a given streaming RPC
+ *     S - the response type for a given streaming RPC
+ *     X - the type used to track the streaming state
+ */
+template <typename Q, typename S, typename X>
+class StreamRpcState : public RpcStateBase
+{
+      public:
+       typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
+               ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
+               ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
+               void *);
 
-       CallState state = CREATE;
+       StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *),
+                      const char *name)
+           : RpcStateBase(name), requestsf(rfunc), callback(cb),
+             async_responder(&ctx){};
+
+       void do_request(::frr::Northbound::AsyncService *service,
+                       ::grpc::ServerCompletionQueue *cq,
+                       bool no_copy) override
+       {
+               grpc_debug("%s, posting a request for: %s", __func__, name);
+               auto copy =
+                       no_copy ? this
+                               : new StreamRpcState(requestsf, callback, name);
+               (service->*requestsf)(&copy->ctx, &copy->request,
+                                     &copy->async_responder, cq, cq, copy);
+       }
+
+       CallState run_mainthread(struct thread *thread) override
+       {
+               if (this->callback(this))
+                       return MORE;
+               else
+                       return FINISH;
+       }
+
+       Q request;
+       S response;
+       grpc::ServerAsyncWriter<S> async_responder;
+
+       bool (*callback)(StreamRpcState<Q, S, X> *);
+       reqsfunc_t requestsf = NULL;
+
+       X context;
 };
 
 // ------------------------------------------------------
@@ -481,15 +549,11 @@ static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
 //       RPC Callback Functions: run on main thread
 // ------------------------------------------------------
 
-void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest,
-                                           frr::GetCapabilitiesResponse> *tag)
+grpc::Status HandleUnaryGetCapabilities(
+       UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
        // Response: string frr_version = 1;
        tag->response.set_frr_version(FRR_VERSION);
@@ -515,30 +579,24 @@ void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest,
        tag->response.add_supported_encodings(frr::JSON);
        tag->response.add_supported_encodings(frr::XML);
 
-       /* Should we do this in the async process call? */
-       tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-
-       /* Indicate we are done. */
-       tag->state = FINISH;
+       return grpc::Status::OK;
 }
 
-void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
-{
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+// Define the context variable type for this streaming handler
+typedef std::list<std::string> GetContextType;
 
-       if (tag->state == FINISH) {
-               delete static_cast<std::list<std::string> *>(tag->context);
-               tag->state = DELETED;
-               return;
-       }
+bool HandleStreamingGet(
+       StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag)
+{
+       grpc_debug("%s: entered", __func__);
 
-       if (!tag->context) {
-               /* Creating, first time called for this RPC */
-               auto mypaths = new std::list<std::string>();
-               tag->context = mypaths;
+       auto mypathps = &tag->context;
+       if (tag->is_initial_process()) {
+               // Fill our context container first time through
+               grpc_debug("%s: initialize streaming state", __func__);
                auto paths = tag->request.path();
                for (const std::string &path : paths) {
-                       mypaths->push_back(std::string(path));
+                       mypathps->push_back(std::string(path));
                }
        }
 
@@ -549,11 +607,9 @@ void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
        // Request: bool with_defaults = 3;
        bool with_defaults = tag->request.with_defaults();
 
-       auto mypathps = static_cast<std::list<std::string> *>(tag->context);
        if (mypathps->empty()) {
                tag->async_responder.Finish(grpc::Status::OK, tag);
-               tag->state = FINISH;
-               return;
+               return false;
        }
 
        frr::GetResponse response;
@@ -571,85 +627,57 @@ void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
        if (!status.ok()) {
                tag->async_responder.WriteAndFinish(
                        response, grpc::WriteOptions(), status, tag);
-               tag->state = FINISH;
-               return;
+               return false;
        }
 
        mypathps->pop_back();
        if (mypathps->empty()) {
                tag->async_responder.WriteAndFinish(
                        response, grpc::WriteOptions(), grpc::Status::OK, tag);
-               tag->state = FINISH;
+               return false;
        } else {
                tag->async_responder.Write(response, tag);
-               tag->state = MORE;
+               return true;
        }
 }
 
-void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest,
-                                           frr::CreateCandidateResponse> *tag)
+grpc::Status HandleUnaryCreateCandidate(
+       UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
        struct candidate *candidate = tag->cdb->create_candidate();
-       if (!candidate) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
-                                    "Can't create candidate configuration"),
-                       tag);
-       } else {
-               tag->response.set_candidate_id(candidate->id);
-               tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       }
-
-       tag->state = FINISH;
+       if (!candidate)
+               return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
+                                   "Can't create candidate configuration");
+       tag->response.set_candidate_id(candidate->id);
+       return grpc::Status::OK;
 }
 
-void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest,
-                                           frr::DeleteCandidateResponse> *tag)
+grpc::Status HandleUnaryDeleteCandidate(
+       UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
-       // Request: uint32 candidate_id = 1;
        uint32_t candidate_id = tag->request.candidate_id();
 
        grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
 
-       if (!tag->cdb->contains(candidate_id)) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::NOT_FOUND,
-                                    "candidate configuration not found"),
-                       tag);
-       } else {
-               tag->cdb->delete_candidate(candidate_id);
-               tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       }
-       tag->state = FINISH;
+       if (!tag->cdb->contains(candidate_id))
+               return grpc::Status(grpc::StatusCode::NOT_FOUND,
+                                   "candidate configuration not found");
+       tag->cdb->delete_candidate(candidate_id);
+       return grpc::Status::OK;
 }
 
-void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest,
-                                           frr::UpdateCandidateResponse> *tag)
+grpc::Status HandleUnaryUpdateCandidate(
+       UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
-       // Request: uint32 candidate_id = 1;
        uint32_t candidate_id = tag->request.candidate_id();
 
        grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
@@ -657,58 +685,33 @@ void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest,
        struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
 
        if (!candidate)
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::NOT_FOUND,
-                                    "candidate configuration not found"),
-                       tag);
-       else if (candidate->transaction)
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(
-                               grpc::StatusCode::FAILED_PRECONDITION,
-                               "candidate is in the middle of a transaction"),
-                       tag);
-       else if (nb_candidate_update(candidate->config) != NB_OK)
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(
-                               grpc::StatusCode::INTERNAL,
-                               "failed to update candidate configuration"),
-                       tag);
-
-       else
-               tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+               return grpc::Status(grpc::StatusCode::NOT_FOUND,
+                                   "candidate configuration not found");
+       if (candidate->transaction)
+               return grpc::Status(
+                       grpc::StatusCode::FAILED_PRECONDITION,
+                       "candidate is in the middle of a transaction");
+       if (nb_candidate_update(candidate->config) != NB_OK)
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                   "failed to update candidate configuration");
 
-       tag->state = FINISH;
+       return grpc::Status::OK;
 }
 
-void HandleUnaryEditCandidate(
-       NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag)
+grpc::Status HandleUnaryEditCandidate(
+       UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
-       // Request: uint32 candidate_id = 1;
        uint32_t candidate_id = tag->request.candidate_id();
 
        grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
 
        struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
-
-       if (!candidate) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::NOT_FOUND,
-                                    "candidate configuration not found"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (!candidate)
+               return grpc::Status(grpc::StatusCode::NOT_FOUND,
+                                   "candidate configuration not found");
 
        struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
 
@@ -718,15 +721,9 @@ void HandleUnaryEditCandidate(
                                    pv.value().c_str()) != 0) {
                        nb_config_free(candidate_tmp);
 
-                       tag->responder.Finish(
-                               tag->response,
-                               grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
-                                            "Failed to update \"" + pv.path()
-                                                    + "\""),
-                               tag);
-
-                       tag->state = FINISH;
-                       return;
+                       return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                                           "Failed to update \"" + pv.path() +
+                                                   "\"");
                }
        }
 
@@ -734,36 +731,23 @@ void HandleUnaryEditCandidate(
        for (const frr::PathValue &pv : pvs) {
                if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
                        nb_config_free(candidate_tmp);
-                       tag->responder.Finish(
-                               tag->response,
-                               grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
-                                            "Failed to remove \"" + pv.path()
-                                                    + "\""),
-                               tag);
-                       tag->state = FINISH;
-                       return;
+                       return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                                           "Failed to remove \"" + pv.path() +
+                                                   "\"");
                }
        }
 
        // No errors, accept all changes.
        nb_config_replace(candidate->config, candidate_tmp, false);
-
-       tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-
-       tag->state = FINISH;
+       return grpc::Status::OK;
 }
 
-void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest,
-                                           frr::LoadToCandidateResponse> *tag)
+grpc::Status HandleUnaryLoadToCandidate(
+       UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
-       // Request: uint32 candidate_id = 1;
        uint32_t candidate_id = tag->request.candidate_id();
 
        grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
@@ -773,59 +757,31 @@ void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest,
        // Request: DataTree config = 3;
        auto config = tag->request.config();
 
-
        struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
-
-       if (!candidate) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::NOT_FOUND,
-                                    "candidate configuration not found"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (!candidate)
+               return grpc::Status(grpc::StatusCode::NOT_FOUND,
+                                   "candidate configuration not found");
 
        struct lyd_node *dnode = dnode_from_data_tree(&config, true);
-       if (!dnode) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::INTERNAL,
-                                    "Failed to parse the configuration"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (!dnode)
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                   "Failed to parse the configuration");
 
        struct nb_config *loaded_config = nb_config_new(dnode);
-
        if (load_type == frr::LoadToCandidateRequest::REPLACE)
                nb_config_replace(candidate->config, loaded_config, false);
-       else if (nb_config_merge(candidate->config, loaded_config, false)
-                != NB_OK) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(
-                               grpc::StatusCode::INTERNAL,
-                               "Failed to merge the loaded configuration"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       else if (nb_config_merge(candidate->config, loaded_config, false) !=
+                NB_OK)
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                   "Failed to merge the loaded configuration");
 
-       tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       tag->state = FINISH;
+       return grpc::Status::OK;
 }
 
-void HandleUnaryCommit(
-       NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
+grpc::Status
+HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
        // Request: uint32 candidate_id = 1;
        uint32_t candidate_id = tag->request.candidate_id();
@@ -839,15 +795,9 @@ void HandleUnaryCommit(
 
        // Find candidate configuration.
        struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
-       if (!candidate) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::NOT_FOUND,
-                                    "candidate configuration not found"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (!candidate)
+               return grpc::Status(grpc::StatusCode::NOT_FOUND,
+                                   "candidate configuration not found");
 
        int ret = NB_OK;
        uint32_t transaction_id = 0;
@@ -856,29 +806,17 @@ void HandleUnaryCommit(
        switch (phase) {
        case frr::CommitRequest::PREPARE:
        case frr::CommitRequest::ALL:
-               if (candidate->transaction) {
-                       tag->responder.Finish(
-                               tag->response,
-                               grpc::Status(
-                                       grpc::StatusCode::FAILED_PRECONDITION,
-                                       "candidate is in the middle of a transaction"),
-                               tag);
-                       tag->state = FINISH;
-                       return;
-               }
+               if (candidate->transaction)
+                       return grpc::Status(
+                               grpc::StatusCode::FAILED_PRECONDITION,
+                               "candidate is in the middle of a transaction");
                break;
        case frr::CommitRequest::ABORT:
        case frr::CommitRequest::APPLY:
-               if (!candidate->transaction) {
-                       tag->responder.Finish(
-                               tag->response,
-                               grpc::Status(
-                                       grpc::StatusCode::FAILED_PRECONDITION,
-                                       "no transaction in progress"),
-                               tag);
-                       tag->state = FINISH;
-                       return;
-               }
+               if (!candidate->transaction)
+                       return grpc::Status(
+                               grpc::StatusCode::FAILED_PRECONDITION,
+                               "no transaction in progress");
                break;
        default:
                break;
@@ -958,53 +896,30 @@ void HandleUnaryCommit(
        if (strlen(errmsg) > 0)
                tag->response.set_error_message(errmsg);
 
-       tag->responder.Finish(tag->response, status, tag);
-       tag->state = FINISH;
+       return status;
 }
 
-void HandleUnaryLockConfig(
-       NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
+grpc::Status HandleUnaryLockConfig(
+       UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
-       if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
-                                    "running configuration is locked already"),
-                       tag);
-       } else {
-               tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       }
-       tag->state = FINISH;
+       if (nb_running_lock(NB_CLIENT_GRPC, NULL))
+               return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
+                                   "running configuration is locked already");
+       return grpc::Status::OK;
 }
 
-void HandleUnaryUnlockConfig(
-       NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
+grpc::Status HandleUnaryUnlockConfig(
+       UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
-       if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(
-                               grpc::StatusCode::FAILED_PRECONDITION,
-                               "failed to unlock the running configuration"),
-                       tag);
-       } else {
-               tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       }
-       tag->state = FINISH;
+       if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
+               return grpc::Status(
+                       grpc::StatusCode::FAILED_PRECONDITION,
+                       "failed to unlock the running configuration");
+       return grpc::Status::OK;
 }
 
 static void list_transactions_cb(void *arg, int transaction_id,
@@ -1018,45 +933,34 @@ static void list_transactions_cb(void *arg, int transaction_id,
                                std::string(date), std::string(comment)));
 }
 
-void HandleStreamingListTransactions(
-       NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse>
-               *tag)
-{
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               delete static_cast<std::list<std::tuple<
-                       int, std::string, std::string, std::string>> *>(
-                       tag->context);
-               tag->state = DELETED;
-               return;
-       }
+// Define the context variable type for this streaming handler
+typedef std::list<std::tuple<int, std::string, std::string, std::string>>
+       ListTransactionsContextType;
 
-       if (!tag->context) {
-               /* Creating, first time called for this RPC */
-               auto new_list =
-                       new std::list<std::tuple<int, std::string, std::string,
-                                                std::string>>();
-               tag->context = new_list;
-               nb_db_transactions_iterate(list_transactions_cb, tag->context);
+bool HandleStreamingListTransactions(
+       StreamRpcState<frr::ListTransactionsRequest,
+                      frr::ListTransactionsResponse,
+                      ListTransactionsContextType> *tag)
+{
+       grpc_debug("%s: entered", __func__);
 
-               new_list->push_back(std::make_tuple(
+       auto list = &tag->context;
+       if (tag->is_initial_process()) {
+               grpc_debug("%s: initialize streaming state", __func__);
+               // Fill our context container first time through
+               nb_db_transactions_iterate(list_transactions_cb, list);
+               list->push_back(std::make_tuple(
                        0xFFFF, std::string("fake client"),
                        std::string("fake date"), std::string("fake comment")));
-               new_list->push_back(
-                       std::make_tuple(0xFFFE, std::string("fake client2"),
-                                       std::string("fake date"),
-                                       std::string("fake comment2")));
+               list->push_back(std::make_tuple(0xFFFE,
+                                               std::string("fake client2"),
+                                               std::string("fake date"),
+                                               std::string("fake comment2")));
        }
 
-       auto list = static_cast<std::list<
-               std::tuple<int, std::string, std::string, std::string>> *>(
-               tag->context);
-
        if (list->empty()) {
                tag->async_responder.Finish(grpc::Status::OK, tag);
-               tag->state = FINISH;
-               return;
+               return false;
        }
 
        auto item = list->back();
@@ -1079,22 +983,18 @@ void HandleStreamingListTransactions(
        if (list->empty()) {
                tag->async_responder.WriteAndFinish(
                        response, grpc::WriteOptions(), grpc::Status::OK, tag);
-               tag->state = FINISH;
+               return false;
        } else {
                tag->async_responder.Write(response, tag);
-               tag->state = MORE;
+               return true;
        }
 }
 
-void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
-                                          frr::GetTransactionResponse> *tag)
+grpc::Status HandleUnaryGetTransaction(
+       UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse>
+               *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
        // Request: uint32 transaction_id = 1;
        uint32_t transaction_id = tag->request.transaction_id();
@@ -1110,15 +1010,9 @@ void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
 
        // Load configuration from the transactions database.
        nb_config = nb_db_transaction_load(transaction_id);
-       if (!nb_config) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
-                                    "Transaction not found"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (!nb_config)
+               return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                                   "Transaction not found");
 
        // Response: DataTree config = 1;
        auto config = tag->response.mutable_config();
@@ -1129,29 +1023,19 @@ void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
                                 encoding2lyd_format(encoding), with_defaults)
            != 0) {
                nb_config_free(nb_config);
-               tag->responder.Finish(tag->response,
-                                     grpc::Status(grpc::StatusCode::INTERNAL,
-                                                  "Failed to dump data"),
-                                     tag);
-               tag->state = FINISH;
-               return;
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                   "Failed to dump data");
        }
 
        nb_config_free(nb_config);
 
-       tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       tag->state = FINISH;
+       return grpc::Status::OK;
 }
 
-void HandleUnaryExecute(
-       NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
+grpc::Status HandleUnaryExecute(
+       UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
 {
-       grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
-       if (tag->state == FINISH) {
-               tag->state = DELETED;
-               return;
-       }
+       grpc_debug("%s: entered", __func__);
 
        struct nb_node *nb_node;
        struct list *input_list;
@@ -1166,26 +1050,14 @@ void HandleUnaryExecute(
 
        grpc_debug("%s(path: \"%s\")", __func__, xpath);
 
-       if (tag->request.path().empty()) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
-                                    "Data path is empty"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (tag->request.path().empty())
+               return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                                   "Data path is empty");
 
        nb_node = nb_node_find(xpath);
-       if (!nb_node) {
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
-                                    "Unknown data path"),
-                       tag);
-               tag->state = FINISH;
-               return;
-       }
+       if (!nb_node)
+               return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                                   "Unknown data path");
 
        input_list = yang_data_list_new();
        output_list = yang_data_list_new();
@@ -1207,12 +1079,7 @@ void HandleUnaryExecute(
                list_delete(&input_list);
                list_delete(&output_list);
 
-               tag->responder.Finish(
-                       tag->response,
-                       grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"),
-                       tag);
-               tag->state = FINISH;
-               return;
+               return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed");
        }
 
        // Process output parameters.
@@ -1227,8 +1094,7 @@ void HandleUnaryExecute(
        list_delete(&input_list);
        list_delete(&output_list);
 
-       tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-       tag->state = FINISH;
+       return grpc::Status::OK;
 }
 
 // ------------------------------------------------------
@@ -1238,18 +1104,19 @@ void HandleUnaryExecute(
 
 #define REQUEST_NEWRPC(NAME, cdb)                                              \
        do {                                                                   \
-               auto _rpcState = new NewRpcState<frr::NAME##Request,           \
-                                                frr::NAME##Response>(         \
+               auto _rpcState = new UnaryRpcState<frr::NAME##Request,         \
+                                                  frr::NAME##Response>(       \
                        (cdb), &frr::Northbound::AsyncService::Request##NAME,  \
                        &HandleUnary##NAME, #NAME);                            \
                _rpcState->do_request(&service, cq.get(), true);               \
        } while (0)
 
-#define REQUEST_NEWRPC_STREAMING(NAME, cdb)                                    \
+#define REQUEST_NEWRPC_STREAMING(NAME)                                         \
        do {                                                                   \
-               auto _rpcState = new NewRpcState<frr::NAME##Request,           \
-                                                frr::NAME##Response>(         \
-                       (cdb), &frr::Northbound::AsyncService::Request##NAME,  \
+               auto _rpcState = new StreamRpcState<frr::NAME##Request,        \
+                                                   frr::NAME##Response,       \
+                                                   NAME##ContextType>(        \
+                       &frr::Northbound::AsyncService::Request##NAME,         \
                        &HandleStreaming##NAME, #NAME);                        \
                _rpcState->do_request(&service, cq.get(), true);               \
        } while (0)
@@ -1288,7 +1155,7 @@ static void *grpc_pthread_start(void *arg)
 
        grpc_running = true;
 
-       /* Schedule all RPC handlers */
+       /* Schedule unary RPC handlers */
        REQUEST_NEWRPC(GetCapabilities, NULL);
        REQUEST_NEWRPC(CreateCandidate, &candidates);
        REQUEST_NEWRPC(DeleteCandidate, &candidates);
@@ -1300,8 +1167,10 @@ static void *grpc_pthread_start(void *arg)
        REQUEST_NEWRPC(LockConfig, NULL);
        REQUEST_NEWRPC(UnlockConfig, NULL);
        REQUEST_NEWRPC(Execute, NULL);
-       REQUEST_NEWRPC_STREAMING(Get, NULL);
-       REQUEST_NEWRPC_STREAMING(ListTransactions, NULL);
+
+       /* Schedule streaming RPC handlers */
+       REQUEST_NEWRPC_STREAMING(Get);
+       REQUEST_NEWRPC_STREAMING(ListTransactions);
 
        zlog_notice("gRPC server listening on %s",
                    server_address.str().c_str());
@@ -1309,7 +1178,7 @@ static void *grpc_pthread_start(void *arg)
        /* Process inbound RPCs */
        bool ok;
        void *tag;
-       while (grpc_running) {
+       while (true) {
                if (!cq->Next(&tag, &ok)) {
                        grpc_debug("%s: CQ empty exiting", __func__);
                        break;
@@ -1318,25 +1187,18 @@ static void *grpc_pthread_start(void *arg)
                grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
                           ok);
 
-               if (!ok || !grpc_running) {
+               if (!ok) {
                        delete static_cast<RpcStateBase *>(tag);
                        break;
                }
 
                RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
-               CallState state = rpc->doCallback();
-               grpc_debug("%s: callback returned RPC State: %s", __func__,
-                          call_states[state]);
-
-               /*
-                * Our side is done (FINISH) receive new requests of this type
-                * We could do this earlier but that would mean we could be
-                * handling multiple same type requests in parallel. We expect
-                * to be called back once more in the FINISH state (from the
-                * user indicating Finish() for cleanup.
-                */
-               if (state == FINISH && grpc_running)
-                       rpc->do_request(&service, cq.get(), false);
+               if (rpc->get_state() != FINISH)
+                       rpc->run(&service, cq.get());
+               else {
+                       grpc_debug("%s RPC FINISH -> [delete]", rpc->name);
+                       delete rpc;
+               }
        }
 
        /* This was probably done for us to get here, but let's be safe */