#define GRPC_DEFAULT_PORT 50051
+
+// ------------------------------------------------------
+// File Local Variables
+// ------------------------------------------------------
+
/*
* NOTE: we can't use the FRR debugging infrastructure here since it uses
* atomics and C++ has a different atomics API. Enable gRPC debugging
std::map<uint64_t, struct candidate> _cdb;
};
+/*
+ * RpcStateBase is the common base class used to track a gRPC RPC.
+ */
class RpcStateBase
{
public:
- virtual ~RpcStateBase() = default;
- virtual CallState doCallback() = 0;
virtual void do_request(::frr::Northbound::AsyncService *service,
::grpc::ServerCompletionQueue *cq,
bool no_copy) = 0;
-};
-/*
- * The RPC state class is used to track the execution of an RPC.
- */
-template <typename Q, typename S> class NewRpcState : RpcStateBase
-{
- typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
- ::grpc::ServerContext *, Q *,
- ::grpc::ServerAsyncResponseWriter<S> *,
- ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
- void *);
- typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
- ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
- ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
- void *);
+ RpcStateBase(const char *name) : name(name){};
- public:
- NewRpcState(Candidates *cdb, reqfunc_t rfunc,
- void (*cb)(NewRpcState<Q, S> *), const char *name)
- : requestf(rfunc), callback(cb), responder(&ctx),
- async_responder(&ctx), name(name), cdb(cdb){};
- NewRpcState(Candidates *cdb, reqsfunc_t rfunc,
- void (*cb)(NewRpcState<Q, S> *), const char *name)
- : requestsf(rfunc), callback(cb), responder(&ctx),
- async_responder(&ctx), name(name), cdb(cdb){};
-
- CallState doCallback() override
+ virtual ~RpcStateBase() = default;
+
+ CallState get_state() const
{
- CallState enter_state = this->state;
- CallState new_state;
- if (enter_state == FINISH) {
- grpc_debug("%s RPC FINISH -> DELETED", name);
- new_state = FINISH;
- } else {
- grpc_debug("%s RPC: %s -> PROCESS", name,
- call_states[this->state]);
- new_state = PROCESS;
- }
+ return state;
+ }
+
+ bool is_initial_process() const
+ {
+ /* Will always be true for Unary */
+ return entered_state == CREATE;
+ }
+
+ // Returns "more" status, if false caller can delete
+ bool run(frr::Northbound::AsyncService *service,
+ grpc::ServerCompletionQueue *cq)
+ {
+ /*
+ * We enter in either CREATE or MORE state, and transition to
+ * PROCESS state.
+ */
+ this->entered_state = this->state;
+ this->state = PROCESS;
+ grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name,
+ call_states[this->entered_state],
+ call_states[this->state]);
/*
- * We are either in state CREATE, MORE or FINISH. If CREATE or
- * MORE move back to PROCESS, otherwise we are cleaning up
- * (FINISH) so leave it in that state. Run the callback on the
- * main threadmaster/pthread; and wait for expected transition
- * from main thread. If transition is to FINISH->DELETED.
- * delete us.
- *
- * We update the state prior to scheduling the callback which
- * may then update the state in the master pthread. Then we
- * obtain the lock in the condvar-check-loop as the callback
- * will be modifying updating the state value.
+ * We schedule the callback on the main pthread, and wait for
+ * the state to transition out of the PROCESS state. The new
+ * state will either be MORE or FINISH. It will always be FINISH
+ * for Unary RPCs.
*/
- this->state = new_state;
thread_add_event(main_master, c_callback, (void *)this, 0,
NULL);
+
pthread_mutex_lock(&this->cmux);
- while (this->state == new_state)
+ while (this->state == PROCESS)
pthread_cond_wait(&this->cond, &this->cmux);
pthread_mutex_unlock(&this->cmux);
- if (enter_state == FINISH)
- assert(this->state == DELETED);
-
- if (this->state == DELETED) {
- grpc_debug("%s RPC: -> [DELETED]", name);
- delete this;
- return DELETED;
- }
- return this->state;
- }
-
- void do_request(::frr::Northbound::AsyncService *service,
- ::grpc::ServerCompletionQueue *cq,
- bool no_copy) override
- {
- grpc_debug("%s, posting a request for: %s", __func__, name);
- if (requestf) {
- NewRpcState<Q, S> *copy =
- no_copy ? this
- : new NewRpcState(cdb, requestf,
- callback, name);
- (service->*requestf)(©->ctx, ©->request,
- ©->responder, cq, cq, copy);
- } else {
- NewRpcState<Q, S> *copy =
- no_copy ? this
- : new NewRpcState(cdb, requestsf,
- callback, name);
- (service->*requestsf)(©->ctx, ©->request,
- ©->async_responder, cq, cq,
- copy);
+ grpc_debug("%s RPC in %s on grpc-io-thread", name,
+ call_states[this->state]);
+
+ if (this->state == FINISH) {
+ /*
+ * Server is done (FINISH) so prep to receive a new
+ * request of this type. We could do this earlier but
+ * that would mean we could be handling multiple same
+ * type requests in parallel without limit.
+ */
+ this->do_request(service, cq, false);
}
+ return true;
}
+ protected:
+ virtual CallState run_mainthread(struct thread *thread) = 0;
static void c_callback(struct thread *thread)
{
- auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg);
+ auto _tag = static_cast<RpcStateBase *>(thread->arg);
/*
* We hold the lock until the callback finishes and has updated
* _tag->state, then we signal done and release.
pthread_mutex_lock(&_tag->cmux);
CallState enter_state = _tag->state;
- grpc_debug("%s RPC running on main thread", _tag->name);
+ grpc_debug("%s RPC: running %s on main thread", _tag->name,
+ call_states[enter_state]);
- _tag->callback(_tag);
+ _tag->state = _tag->run_mainthread(thread);
- grpc_debug("%s RPC: %s -> %s", _tag->name,
+ grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name,
call_states[enter_state], call_states[_tag->state]);
pthread_cond_signal(&_tag->cond);
return;
}
- const char *name;
grpc::ServerContext ctx;
+ pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
+ pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+ CallState state = CREATE;
+ CallState entered_state;
+
+ public:
+ const char *name;
+};
+
+/*
+ * The UnaryRpcState class is used to track the execution of a Unary RPC.
+ *
+ * Template Args:
+ * Q - the request type for a given unary RPC
+ * S - the response type for a given unary RPC
+ */
+template <typename Q, typename S> class UnaryRpcState : public RpcStateBase
+{
+ public:
+ typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
+ ::grpc::ServerContext *, Q *,
+ ::grpc::ServerAsyncResponseWriter<S> *,
+ ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
+ void *);
+
+ UnaryRpcState(Candidates *cdb, reqfunc_t rfunc,
+ grpc::Status (*cb)(UnaryRpcState<Q, S> *),
+ const char *name)
+ : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb),
+ responder(&ctx){};
+
+ void do_request(::frr::Northbound::AsyncService *service,
+ ::grpc::ServerCompletionQueue *cq,
+ bool no_copy) override
+ {
+ grpc_debug("%s, posting a request for: %s", __func__, name);
+ auto copy = no_copy ? this
+ : new UnaryRpcState(cdb, requestf, callback,
+ name);
+ (service->*requestf)(©->ctx, ©->request,
+ ©->responder, cq, cq, copy);
+ }
+
+ CallState run_mainthread(struct thread *thread) override
+ {
+ // Unary RPC are always finished, see "Unary" :)
+ grpc::Status status = this->callback(this);
+ responder.Finish(response, status, this);
+ return FINISH;
+ }
+
+ Candidates *cdb;
+
Q request;
S response;
grpc::ServerAsyncResponseWriter<S> responder;
- grpc::ServerAsyncWriter<S> async_responder;
- Candidates *cdb;
- void (*callback)(NewRpcState<Q, S> *);
+ grpc::Status (*callback)(UnaryRpcState<Q, S> *);
reqfunc_t requestf = NULL;
- reqsfunc_t requestsf = NULL;
+};
- pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
- pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- void *context = 0;
+/*
+ * The StreamRpcState class is used to track the execution of a Streaming RPC.
+ *
+ * Template Args:
+ * Q - the request type for a given streaming RPC
+ * S - the response type for a given streaming RPC
+ * X - the type used to track the streaming state
+ */
+template <typename Q, typename S, typename X>
+class StreamRpcState : public RpcStateBase
+{
+ public:
+ typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
+ ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
+ ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
+ void *);
- CallState state = CREATE;
+ StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *),
+ const char *name)
+ : RpcStateBase(name), requestsf(rfunc), callback(cb),
+ async_responder(&ctx){};
+
+ void do_request(::frr::Northbound::AsyncService *service,
+ ::grpc::ServerCompletionQueue *cq,
+ bool no_copy) override
+ {
+ grpc_debug("%s, posting a request for: %s", __func__, name);
+ auto copy =
+ no_copy ? this
+ : new StreamRpcState(requestsf, callback, name);
+ (service->*requestsf)(©->ctx, ©->request,
+ ©->async_responder, cq, cq, copy);
+ }
+
+ CallState run_mainthread(struct thread *thread) override
+ {
+ if (this->callback(this))
+ return MORE;
+ else
+ return FINISH;
+ }
+
+ Q request;
+ S response;
+ grpc::ServerAsyncWriter<S> async_responder;
+
+ bool (*callback)(StreamRpcState<Q, S, X> *);
+ reqsfunc_t requestsf = NULL;
+
+ X context;
};
// ------------------------------------------------------
// RPC Callback Functions: run on main thread
// ------------------------------------------------------
-void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest,
- frr::GetCapabilitiesResponse> *tag)
+grpc::Status HandleUnaryGetCapabilities(
+ UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
// Response: string frr_version = 1;
tag->response.set_frr_version(FRR_VERSION);
tag->response.add_supported_encodings(frr::JSON);
tag->response.add_supported_encodings(frr::XML);
- /* Should we do this in the async process call? */
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-
- /* Indicate we are done. */
- tag->state = FINISH;
+ return grpc::Status::OK;
}
-void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
-{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+// Define the context variable type for this streaming handler
+typedef std::list<std::string> GetContextType;
- if (tag->state == FINISH) {
- delete static_cast<std::list<std::string> *>(tag->context);
- tag->state = DELETED;
- return;
- }
+bool HandleStreamingGet(
+ StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag)
+{
+ grpc_debug("%s: entered", __func__);
- if (!tag->context) {
- /* Creating, first time called for this RPC */
- auto mypaths = new std::list<std::string>();
- tag->context = mypaths;
+ auto mypathps = &tag->context;
+ if (tag->is_initial_process()) {
+ // Fill our context container first time through
+ grpc_debug("%s: initialize streaming state", __func__);
auto paths = tag->request.path();
for (const std::string &path : paths) {
- mypaths->push_back(std::string(path));
+ mypathps->push_back(std::string(path));
}
}
// Request: bool with_defaults = 3;
bool with_defaults = tag->request.with_defaults();
- auto mypathps = static_cast<std::list<std::string> *>(tag->context);
if (mypathps->empty()) {
tag->async_responder.Finish(grpc::Status::OK, tag);
- tag->state = FINISH;
- return;
+ return false;
}
frr::GetResponse response;
if (!status.ok()) {
tag->async_responder.WriteAndFinish(
response, grpc::WriteOptions(), status, tag);
- tag->state = FINISH;
- return;
+ return false;
}
mypathps->pop_back();
if (mypathps->empty()) {
tag->async_responder.WriteAndFinish(
response, grpc::WriteOptions(), grpc::Status::OK, tag);
- tag->state = FINISH;
+ return false;
} else {
tag->async_responder.Write(response, tag);
- tag->state = MORE;
+ return true;
}
}
-void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest,
- frr::CreateCandidateResponse> *tag)
+grpc::Status HandleUnaryCreateCandidate(
+ UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
struct candidate *candidate = tag->cdb->create_candidate();
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
- "Can't create candidate configuration"),
- tag);
- } else {
- tag->response.set_candidate_id(candidate->id);
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- }
-
- tag->state = FINISH;
+ if (!candidate)
+ return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
+ "Can't create candidate configuration");
+ tag->response.set_candidate_id(candidate->id);
+ return grpc::Status::OK;
}
-void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest,
- frr::DeleteCandidateResponse> *tag)
+grpc::Status HandleUnaryDeleteCandidate(
+ UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
- // Request: uint32 candidate_id = 1;
uint32_t candidate_id = tag->request.candidate_id();
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
- if (!tag->cdb->contains(candidate_id)) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- } else {
- tag->cdb->delete_candidate(candidate_id);
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- }
- tag->state = FINISH;
+ if (!tag->cdb->contains(candidate_id))
+ return grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found");
+ tag->cdb->delete_candidate(candidate_id);
+ return grpc::Status::OK;
}
-void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest,
- frr::UpdateCandidateResponse> *tag)
+grpc::Status HandleUnaryUpdateCandidate(
+ UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
- // Request: uint32 candidate_id = 1;
uint32_t candidate_id = tag->request.candidate_id();
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
if (!candidate)
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- else if (candidate->transaction)
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "candidate is in the middle of a transaction"),
- tag);
- else if (nb_candidate_update(candidate->config) != NB_OK)
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::INTERNAL,
- "failed to update candidate configuration"),
- tag);
-
- else
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+ return grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found");
+ if (candidate->transaction)
+ return grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "candidate is in the middle of a transaction");
+ if (nb_candidate_update(candidate->config) != NB_OK)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "failed to update candidate configuration");
- tag->state = FINISH;
+ return grpc::Status::OK;
}
-void HandleUnaryEditCandidate(
- NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag)
+grpc::Status HandleUnaryEditCandidate(
+ UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
- // Request: uint32 candidate_id = 1;
uint32_t candidate_id = tag->request.candidate_id();
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
-
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!candidate)
+ return grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found");
struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
pv.value().c_str()) != 0) {
nb_config_free(candidate_tmp);
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Failed to update \"" + pv.path()
- + "\""),
- tag);
-
- tag->state = FINISH;
- return;
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Failed to update \"" + pv.path() +
+ "\"");
}
}
for (const frr::PathValue &pv : pvs) {
if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
nb_config_free(candidate_tmp);
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Failed to remove \"" + pv.path()
- + "\""),
- tag);
- tag->state = FINISH;
- return;
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Failed to remove \"" + pv.path() +
+ "\"");
}
}
// No errors, accept all changes.
nb_config_replace(candidate->config, candidate_tmp, false);
-
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
-
- tag->state = FINISH;
+ return grpc::Status::OK;
}
-void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest,
- frr::LoadToCandidateResponse> *tag)
+grpc::Status HandleUnaryLoadToCandidate(
+ UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
- // Request: uint32 candidate_id = 1;
uint32_t candidate_id = tag->request.candidate_id();
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
// Request: DataTree config = 3;
auto config = tag->request.config();
-
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
-
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!candidate)
+ return grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found");
struct lyd_node *dnode = dnode_from_data_tree(&config, true);
- if (!dnode) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INTERNAL,
- "Failed to parse the configuration"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!dnode)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to parse the configuration");
struct nb_config *loaded_config = nb_config_new(dnode);
-
if (load_type == frr::LoadToCandidateRequest::REPLACE)
nb_config_replace(candidate->config, loaded_config, false);
- else if (nb_config_merge(candidate->config, loaded_config, false)
- != NB_OK) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Failed to merge the loaded configuration"),
- tag);
- tag->state = FINISH;
- return;
- }
+ else if (nb_config_merge(candidate->config, loaded_config, false) !=
+ NB_OK)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to merge the loaded configuration");
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- tag->state = FINISH;
+ return grpc::Status::OK;
}
-void HandleUnaryCommit(
- NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
+grpc::Status
+HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
// Request: uint32 candidate_id = 1;
uint32_t candidate_id = tag->request.candidate_id();
// Find candidate configuration.
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!candidate)
+ return grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found");
int ret = NB_OK;
uint32_t transaction_id = 0;
switch (phase) {
case frr::CommitRequest::PREPARE:
case frr::CommitRequest::ALL:
- if (candidate->transaction) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "candidate is in the middle of a transaction"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (candidate->transaction)
+ return grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "candidate is in the middle of a transaction");
break;
case frr::CommitRequest::ABORT:
case frr::CommitRequest::APPLY:
- if (!candidate->transaction) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "no transaction in progress"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!candidate->transaction)
+ return grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "no transaction in progress");
break;
default:
break;
if (strlen(errmsg) > 0)
tag->response.set_error_message(errmsg);
- tag->responder.Finish(tag->response, status, tag);
- tag->state = FINISH;
+ return status;
}
-void HandleUnaryLockConfig(
- NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
+grpc::Status HandleUnaryLockConfig(
+ UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
- if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
- "running configuration is locked already"),
- tag);
- } else {
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- }
- tag->state = FINISH;
+ if (nb_running_lock(NB_CLIENT_GRPC, NULL))
+ return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
+ "running configuration is locked already");
+ return grpc::Status::OK;
}
-void HandleUnaryUnlockConfig(
- NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
+grpc::Status HandleUnaryUnlockConfig(
+ UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
- if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::FAILED_PRECONDITION,
- "failed to unlock the running configuration"),
- tag);
- } else {
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- }
- tag->state = FINISH;
+ if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
+ return grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "failed to unlock the running configuration");
+ return grpc::Status::OK;
}
static void list_transactions_cb(void *arg, int transaction_id,
std::string(date), std::string(comment)));
}
-void HandleStreamingListTransactions(
- NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse>
- *tag)
-{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- delete static_cast<std::list<std::tuple<
- int, std::string, std::string, std::string>> *>(
- tag->context);
- tag->state = DELETED;
- return;
- }
+// Define the context variable type for this streaming handler
+typedef std::list<std::tuple<int, std::string, std::string, std::string>>
+ ListTransactionsContextType;
- if (!tag->context) {
- /* Creating, first time called for this RPC */
- auto new_list =
- new std::list<std::tuple<int, std::string, std::string,
- std::string>>();
- tag->context = new_list;
- nb_db_transactions_iterate(list_transactions_cb, tag->context);
+bool HandleStreamingListTransactions(
+ StreamRpcState<frr::ListTransactionsRequest,
+ frr::ListTransactionsResponse,
+ ListTransactionsContextType> *tag)
+{
+ grpc_debug("%s: entered", __func__);
- new_list->push_back(std::make_tuple(
+ auto list = &tag->context;
+ if (tag->is_initial_process()) {
+ grpc_debug("%s: initialize streaming state", __func__);
+ // Fill our context container first time through
+ nb_db_transactions_iterate(list_transactions_cb, list);
+ list->push_back(std::make_tuple(
0xFFFF, std::string("fake client"),
std::string("fake date"), std::string("fake comment")));
- new_list->push_back(
- std::make_tuple(0xFFFE, std::string("fake client2"),
- std::string("fake date"),
- std::string("fake comment2")));
+ list->push_back(std::make_tuple(0xFFFE,
+ std::string("fake client2"),
+ std::string("fake date"),
+ std::string("fake comment2")));
}
- auto list = static_cast<std::list<
- std::tuple<int, std::string, std::string, std::string>> *>(
- tag->context);
-
if (list->empty()) {
tag->async_responder.Finish(grpc::Status::OK, tag);
- tag->state = FINISH;
- return;
+ return false;
}
auto item = list->back();
if (list->empty()) {
tag->async_responder.WriteAndFinish(
response, grpc::WriteOptions(), grpc::Status::OK, tag);
- tag->state = FINISH;
+ return false;
} else {
tag->async_responder.Write(response, tag);
- tag->state = MORE;
+ return true;
}
}
-void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
- frr::GetTransactionResponse> *tag)
+grpc::Status HandleUnaryGetTransaction(
+ UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse>
+ *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
// Request: uint32 transaction_id = 1;
uint32_t transaction_id = tag->request.transaction_id();
// Load configuration from the transactions database.
nb_config = nb_db_transaction_load(transaction_id);
- if (!nb_config) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Transaction not found"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!nb_config)
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Transaction not found");
// Response: DataTree config = 1;
auto config = tag->response.mutable_config();
encoding2lyd_format(encoding), with_defaults)
!= 0) {
nb_config_free(nb_config);
- tag->responder.Finish(tag->response,
- grpc::Status(grpc::StatusCode::INTERNAL,
- "Failed to dump data"),
- tag);
- tag->state = FINISH;
- return;
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to dump data");
}
nb_config_free(nb_config);
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- tag->state = FINISH;
+ return grpc::Status::OK;
}
-void HandleUnaryExecute(
- NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
+grpc::Status HandleUnaryExecute(
+ UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
{
- grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
-
- if (tag->state == FINISH) {
- tag->state = DELETED;
- return;
- }
+ grpc_debug("%s: entered", __func__);
struct nb_node *nb_node;
struct list *input_list;
grpc_debug("%s(path: \"%s\")", __func__, xpath);
- if (tag->request.path().empty()) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Data path is empty"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (tag->request.path().empty())
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Data path is empty");
nb_node = nb_node_find(xpath);
- if (!nb_node) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- "Unknown data path"),
- tag);
- tag->state = FINISH;
- return;
- }
+ if (!nb_node)
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Unknown data path");
input_list = yang_data_list_new();
output_list = yang_data_list_new();
list_delete(&input_list);
list_delete(&output_list);
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"),
- tag);
- tag->state = FINISH;
- return;
+ return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed");
}
// Process output parameters.
list_delete(&input_list);
list_delete(&output_list);
- tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- tag->state = FINISH;
+ return grpc::Status::OK;
}
// ------------------------------------------------------
#define REQUEST_NEWRPC(NAME, cdb) \
do { \
- auto _rpcState = new NewRpcState<frr::NAME##Request, \
- frr::NAME##Response>( \
+ auto _rpcState = new UnaryRpcState<frr::NAME##Request, \
+ frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleUnary##NAME, #NAME); \
_rpcState->do_request(&service, cq.get(), true); \
} while (0)
-#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
+#define REQUEST_NEWRPC_STREAMING(NAME) \
do { \
- auto _rpcState = new NewRpcState<frr::NAME##Request, \
- frr::NAME##Response>( \
- (cdb), &frr::Northbound::AsyncService::Request##NAME, \
+ auto _rpcState = new StreamRpcState<frr::NAME##Request, \
+ frr::NAME##Response, \
+ NAME##ContextType>( \
+ &frr::Northbound::AsyncService::Request##NAME, \
&HandleStreaming##NAME, #NAME); \
_rpcState->do_request(&service, cq.get(), true); \
} while (0)
grpc_running = true;
- /* Schedule all RPC handlers */
+ /* Schedule unary RPC handlers */
REQUEST_NEWRPC(GetCapabilities, NULL);
REQUEST_NEWRPC(CreateCandidate, &candidates);
REQUEST_NEWRPC(DeleteCandidate, &candidates);
REQUEST_NEWRPC(LockConfig, NULL);
REQUEST_NEWRPC(UnlockConfig, NULL);
REQUEST_NEWRPC(Execute, NULL);
- REQUEST_NEWRPC_STREAMING(Get, NULL);
- REQUEST_NEWRPC_STREAMING(ListTransactions, NULL);
+
+ /* Schedule streaming RPC handlers */
+ REQUEST_NEWRPC_STREAMING(Get);
+ REQUEST_NEWRPC_STREAMING(ListTransactions);
zlog_notice("gRPC server listening on %s",
server_address.str().c_str());
/* Process inbound RPCs */
bool ok;
void *tag;
- while (grpc_running) {
+ while (true) {
if (!cq->Next(&tag, &ok)) {
grpc_debug("%s: CQ empty exiting", __func__);
break;
grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
ok);
- if (!ok || !grpc_running) {
+ if (!ok) {
delete static_cast<RpcStateBase *>(tag);
break;
}
RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
- CallState state = rpc->doCallback();
- grpc_debug("%s: callback returned RPC State: %s", __func__,
- call_states[state]);
-
- /*
- * Our side is done (FINISH) receive new requests of this type
- * We could do this earlier but that would mean we could be
- * handling multiple same type requests in parallel. We expect
- * to be called back once more in the FINISH state (from the
- * user indicating Finish() for cleanup.
- */
- if (state == FINISH && grpc_running)
- rpc->do_request(&service, cq.get(), false);
+ if (rpc->get_state() != FINISH)
+ rpc->run(&service, cq.get());
+ else {
+ grpc_debug("%s RPC FINISH -> [delete]", rpc->name);
+ delete rpc;
+ }
}
/* This was probably done for us to get here, but let's be safe */