diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/json.c | 13 | ||||
| -rw-r--r-- | lib/json.h | 22 | ||||
| -rw-r--r-- | lib/log_vty.c | 1 | ||||
| -rw-r--r-- | lib/northbound_grpc.cpp | 829 | ||||
| -rw-r--r-- | lib/prefix.c | 37 | ||||
| -rw-r--r-- | lib/routemap.c | 34 | ||||
| -rw-r--r-- | lib/sockunion.c | 15 | ||||
| -rw-r--r-- | lib/sockunion.h | 1 | ||||
| -rw-r--r-- | lib/typerb.c | 35 | ||||
| -rw-r--r-- | lib/typerb.h | 22 | ||||
| -rw-r--r-- | lib/typesafe.h | 48 | ||||
| -rw-r--r-- | lib/wheel.c | 1 | ||||
| -rw-r--r-- | lib/zclient.c | 19 | ||||
| -rw-r--r-- | lib/zclient.h | 12 |
14 files changed, 578 insertions, 511 deletions
diff --git a/lib/json.c b/lib/json.c index 854a3d59d1..d85a21215c 100644 --- a/lib/json.c +++ b/lib/json.c @@ -74,6 +74,19 @@ void json_object_string_addv(struct json_object *obj, const char *key, json_object_object_add(obj, key, json_object_new_stringv(fmt, args)); } +void json_object_object_addv(struct json_object *parent, + struct json_object *child, const char *keyfmt, + va_list args) +{ + char *text, buf[256]; + + text = vasnprintfrr(MTYPE_TMP, buf, sizeof(buf), keyfmt, args); + json_object_object_add(parent, text, child); + + if (text != buf) + XFREE(MTYPE_TMP, text); +} + void json_object_int_add(struct json_object *obj, const char *key, int64_t i) { json_object_object_add(obj, key, json_object_new_int64(i)); diff --git a/lib/json.h b/lib/json.h index fcaa84c816..78c3836515 100644 --- a/lib/json.h +++ b/lib/json.h @@ -116,6 +116,28 @@ static inline struct json_object *json_object_new_stringf(const char *fmt, ...) return ret; } +/* NOTE: argument order differs! (due to varargs) + * json_object_object_add(parent, key, child) + * json_object_object_addv(parent, child, key, va) + * json_object_object_addf(parent, child, key, ...) + * (would be weird to have the child inbetween the format string and args) + */ +PRINTFRR(3, 0) +extern void json_object_object_addv(struct json_object *parent, + struct json_object *child, + const char *keyfmt, va_list args); +PRINTFRR(3, 4) +static inline void json_object_object_addf(struct json_object *parent, + struct json_object *child, + const char *keyfmt, ...) +{ + va_list args; + + va_start(args, keyfmt); + json_object_object_addv(parent, child, keyfmt, args); + va_end(args); +} + #define JSON_STR "JavaScript Object Notation\n" /* NOTE: json-c lib has following commit 316da85 which diff --git a/lib/log_vty.c b/lib/log_vty.c index ef33a39d4a..81280f302f 100644 --- a/lib/log_vty.c +++ b/lib/log_vty.c @@ -159,6 +159,7 @@ void zlog_rotate(void) { zlog_file_rotate(&zt_file); zlog_file_rotate(&zt_filterfile.parent); + zlog_file_rotate(&zt_file_cmdline); hook_call(zlog_rotate); } diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index e2a6290035..dda1756214 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -39,6 +39,11 @@ #define GRPC_DEFAULT_PORT 50051 + +// ------------------------------------------------------ +// File Local Variables +// ------------------------------------------------------ + /* * NOTE: we can't use the FRR debugging infrastructure here since it uses * atomics and C++ has a different atomics API. Enable gRPC debugging @@ -78,7 +83,7 @@ class Candidates { // Delete candidates. for (auto it = _cdb.begin(); it != _cdb.end(); it++) - delete_candidate(&it->second); + delete_candidate(it->first); } struct candidate *create_candidate(void) @@ -94,8 +99,14 @@ class Candidates return c; } - void delete_candidate(struct candidate *c) + bool contains(uint64_t candidate_id) { + return _cdb.count(candidate_id) > 0; + } + + void delete_candidate(uint64_t candidate_id) + { + struct candidate *c = &_cdb[candidate_id]; char errmsg[BUFSIZ] = {0}; nb_config_free(c->config); @@ -105,119 +116,89 @@ class Candidates _cdb.erase(c->id); } - struct candidate *get_candidate(uint32_t id) + struct candidate *get_candidate(uint64_t id) { return _cdb.count(id) == 0 ? NULL : &_cdb[id]; } private: uint64_t _next_id = 0; - std::map<uint32_t, struct candidate> _cdb; + std::map<uint64_t, struct candidate> _cdb; }; +/* + * RpcStateBase is the common base class used to track a gRPC RPC. + */ class RpcStateBase { public: - virtual ~RpcStateBase() = default; - virtual CallState doCallback() = 0; virtual void do_request(::frr::Northbound::AsyncService *service, ::grpc::ServerCompletionQueue *cq, bool no_copy) = 0; -}; -/* - * The RPC state class is used to track the execution of an RPC. - */ -template <typename Q, typename S> class NewRpcState : RpcStateBase -{ - typedef void (frr::Northbound::AsyncService::*reqfunc_t)( - ::grpc::ServerContext *, Q *, - ::grpc::ServerAsyncResponseWriter<S> *, - ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, - void *); - typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( - ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, - ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, - void *); + RpcStateBase(const char *name) : name(name){}; - public: - NewRpcState(Candidates *cdb, reqfunc_t rfunc, - void (*cb)(NewRpcState<Q, S> *), const char *name) - : requestf(rfunc), callback(cb), responder(&ctx), - async_responder(&ctx), name(name), cdb(cdb){}; - NewRpcState(Candidates *cdb, reqsfunc_t rfunc, - void (*cb)(NewRpcState<Q, S> *), const char *name) - : requestsf(rfunc), callback(cb), responder(&ctx), - async_responder(&ctx), name(name), cdb(cdb){}; - - CallState doCallback() override + virtual ~RpcStateBase() = default; + + CallState get_state() const + { + return state; + } + + bool is_initial_process() const + { + /* Will always be true for Unary */ + return entered_state == CREATE; + } + + // Returns "more" status, if false caller can delete + bool run(frr::Northbound::AsyncService *service, + grpc::ServerCompletionQueue *cq) { - CallState enter_state = this->state; - CallState new_state; - if (enter_state == FINISH) { - grpc_debug("%s RPC FINISH -> DELETED", name); - new_state = FINISH; - } else { - grpc_debug("%s RPC: %s -> PROCESS", name, - call_states[this->state]); - new_state = PROCESS; - } /* - * We are either in state CREATE, MORE or FINISH. If CREATE or - * MORE move back to PROCESS, otherwise we are cleaning up - * (FINISH) so leave it in that state. Run the callback on the - * main threadmaster/pthread; and wait for expected transition - * from main thread. If transition is to FINISH->DELETED. - * delete us. - * - * We update the state prior to scheduling the callback which - * may then update the state in the master pthread. Then we - * obtain the lock in the condvar-check-loop as the callback - * will be modifying updating the state value. + * We enter in either CREATE or MORE state, and transition to + * PROCESS state. + */ + this->entered_state = this->state; + this->state = PROCESS; + grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name, + call_states[this->entered_state], + call_states[this->state]); + /* + * We schedule the callback on the main pthread, and wait for + * the state to transition out of the PROCESS state. The new + * state will either be MORE or FINISH. It will always be FINISH + * for Unary RPCs. */ - this->state = new_state; thread_add_event(main_master, c_callback, (void *)this, 0, NULL); + pthread_mutex_lock(&this->cmux); - while (this->state == new_state) + while (this->state == PROCESS) pthread_cond_wait(&this->cond, &this->cmux); pthread_mutex_unlock(&this->cmux); - if (this->state == DELETED) { - grpc_debug("%s RPC: -> [DELETED]", name); - delete this; - return DELETED; - } - return this->state; - } - - void do_request(::frr::Northbound::AsyncService *service, - ::grpc::ServerCompletionQueue *cq, - bool no_copy) override - { - grpc_debug("%s, posting a request for: %s", __func__, name); - if (requestf) { - NewRpcState<Q, S> *copy = - no_copy ? this - : new NewRpcState(cdb, requestf, - callback, name); - (service->*requestf)(©->ctx, ©->request, - ©->responder, cq, cq, copy); - } else { - NewRpcState<Q, S> *copy = - no_copy ? this - : new NewRpcState(cdb, requestsf, - callback, name); - (service->*requestsf)(©->ctx, ©->request, - ©->async_responder, cq, cq, - copy); + grpc_debug("%s RPC in %s on grpc-io-thread", name, + call_states[this->state]); + + if (this->state == FINISH) { + /* + * Server is done (FINISH) so prep to receive a new + * request of this type. We could do this earlier but + * that would mean we could be handling multiple same + * type requests in parallel without limit. + */ + this->do_request(service, cq, false); } + return true; } + protected: + virtual CallState run_mainthread(struct thread *thread) = 0; static void c_callback(struct thread *thread) { - auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg); + auto _tag = static_cast<RpcStateBase *>(thread->arg); /* * We hold the lock until the callback finishes and has updated * _tag->state, then we signal done and release. @@ -225,11 +206,12 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase pthread_mutex_lock(&_tag->cmux); CallState enter_state = _tag->state; - grpc_debug("%s RPC running on main thread", _tag->name); + grpc_debug("%s RPC: running %s on main thread", _tag->name, + call_states[enter_state]); - _tag->callback(_tag); + _tag->state = _tag->run_mainthread(thread); - grpc_debug("%s RPC: %s -> %s", _tag->name, + grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name, call_states[enter_state], call_states[_tag->state]); pthread_cond_signal(&_tag->cond); @@ -237,23 +219,118 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase return; } - const char *name; grpc::ServerContext ctx; + pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + CallState state = CREATE; + CallState entered_state; + + public: + const char *name; +}; + +/* + * The UnaryRpcState class is used to track the execution of a Unary RPC. + * + * Template Args: + * Q - the request type for a given unary RPC + * S - the response type for a given unary RPC + */ +template <typename Q, typename S> class UnaryRpcState : public RpcStateBase +{ + public: + typedef void (frr::Northbound::AsyncService::*reqfunc_t)( + ::grpc::ServerContext *, Q *, + ::grpc::ServerAsyncResponseWriter<S> *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); + + UnaryRpcState(Candidates *cdb, reqfunc_t rfunc, + grpc::Status (*cb)(UnaryRpcState<Q, S> *), + const char *name) + : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb), + responder(&ctx){}; + + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override + { + grpc_debug("%s, posting a request for: %s", __func__, name); + auto copy = no_copy ? this + : new UnaryRpcState(cdb, requestf, callback, + name); + (service->*requestf)(©->ctx, ©->request, + ©->responder, cq, cq, copy); + } + + CallState run_mainthread(struct thread *thread) override + { + // Unary RPC are always finished, see "Unary" :) + grpc::Status status = this->callback(this); + responder.Finish(response, status, this); + return FINISH; + } + + Candidates *cdb; + Q request; S response; grpc::ServerAsyncResponseWriter<S> responder; - grpc::ServerAsyncWriter<S> async_responder; - Candidates *cdb; - void (*callback)(NewRpcState<Q, S> *); + grpc::Status (*callback)(UnaryRpcState<Q, S> *); reqfunc_t requestf = NULL; - reqsfunc_t requestsf = NULL; +}; - pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; - pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - void *context = 0; +/* + * The StreamRpcState class is used to track the execution of a Streaming RPC. + * + * Template Args: + * Q - the request type for a given streaming RPC + * S - the response type for a given streaming RPC + * X - the type used to track the streaming state + */ +template <typename Q, typename S, typename X> +class StreamRpcState : public RpcStateBase +{ + public: + typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( + ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); - CallState state = CREATE; + StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *), + const char *name) + : RpcStateBase(name), requestsf(rfunc), callback(cb), + async_responder(&ctx){}; + + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override + { + grpc_debug("%s, posting a request for: %s", __func__, name); + auto copy = + no_copy ? this + : new StreamRpcState(requestsf, callback, name); + (service->*requestsf)(©->ctx, ©->request, + ©->async_responder, cq, cq, copy); + } + + CallState run_mainthread(struct thread *thread) override + { + if (this->callback(this)) + return MORE; + else + return FINISH; + } + + Q request; + S response; + grpc::ServerAsyncWriter<S> async_responder; + + bool (*callback)(StreamRpcState<Q, S, X> *); + reqsfunc_t requestsf = NULL; + + X context; }; // ------------------------------------------------------ @@ -472,15 +549,11 @@ static grpc::Status get_path(frr::DataTree *dt, const std::string &path, // RPC Callback Functions: run on main thread // ------------------------------------------------------ -void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest, - frr::GetCapabilitiesResponse> *tag) +grpc::Status HandleUnaryGetCapabilities( + UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Response: string frr_version = 1; tag->response.set_frr_version(FRR_VERSION); @@ -506,30 +579,24 @@ void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest, tag->response.add_supported_encodings(frr::JSON); tag->response.add_supported_encodings(frr::XML); - /* Should we do this in the async process call? */ - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - - /* Indicate we are done. */ - tag->state = FINISH; + return grpc::Status::OK; } -void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) -{ - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); +// Define the context variable type for this streaming handler +typedef std::list<std::string> GetContextType; - if (tag->state == FINISH) { - delete static_cast<std::list<std::string> *>(tag->context); - tag->state = DELETED; - return; - } +bool HandleStreamingGet( + StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag) +{ + grpc_debug("%s: entered", __func__); - if (!tag->context) { - /* Creating, first time called for this RPC */ - auto mypaths = new std::list<std::string>(); - tag->context = mypaths; + auto mypathps = &tag->context; + if (tag->is_initial_process()) { + // Fill our context container first time through + grpc_debug("%s: initialize streaming state", __func__); auto paths = tag->request.path(); for (const std::string &path : paths) { - mypaths->push_back(std::string(path)); + mypathps->push_back(std::string(path)); } } @@ -540,11 +607,9 @@ void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) // Request: bool with_defaults = 3; bool with_defaults = tag->request.with_defaults(); - auto mypathps = static_cast<std::list<std::string> *>(tag->context); if (mypathps->empty()) { tag->async_responder.Finish(grpc::Status::OK, tag); - tag->state = FINISH; - return; + return false; } frr::GetResponse response; @@ -562,86 +627,57 @@ void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) if (!status.ok()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), status, tag); - tag->state = FINISH; - return; + return false; } mypathps->pop_back(); if (mypathps->empty()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), grpc::Status::OK, tag); - tag->state = FINISH; + return false; } else { tag->async_responder.Write(response, tag); - tag->state = MORE; + return true; } } -void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest, - frr::CreateCandidateResponse> *tag) +grpc::Status HandleUnaryCreateCandidate( + UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); struct candidate *candidate = tag->cdb->create_candidate(); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, - "Can't create candidate configuration"), - tag); - } else { - tag->response.set_candidate_id(candidate->id); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - - tag->state = FINISH; + if (!candidate) + return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + "Can't create candidate configuration"); + tag->response.set_candidate_id(candidate->id); + return grpc::Status::OK; } -void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest, - frr::DeleteCandidateResponse> *tag) +grpc::Status HandleUnaryDeleteCandidate( + UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); - struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - } else { - tag->cdb->delete_candidate(candidate); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (!tag->cdb->contains(candidate_id)) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); + tag->cdb->delete_candidate(candidate_id); + return grpc::Status::OK; } -void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest, - frr::UpdateCandidateResponse> *tag) +grpc::Status HandleUnaryUpdateCandidate( + UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); @@ -649,58 +685,33 @@ void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest, struct candidate *candidate = tag->cdb->get_candidate(candidate_id); if (!candidate) - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - else if (candidate->transaction) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - else if (nb_candidate_update(candidate->config) != NB_OK) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"), - tag); - - else - tag->responder.Finish(tag->response, grpc::Status::OK, tag); + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); + if (candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"); + if (nb_candidate_update(candidate->config) != NB_OK) + return grpc::Status(grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryEditCandidate( - NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag) +grpc::Status HandleUnaryEditCandidate( + UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); struct nb_config *candidate_tmp = nb_config_dup(candidate->config); @@ -710,15 +721,9 @@ void HandleUnaryEditCandidate( pv.value().c_str()) != 0) { nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Failed to update \"" + pv.path() - + "\""), - tag); - - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to update \"" + pv.path() + + "\""); } } @@ -726,36 +731,23 @@ void HandleUnaryEditCandidate( for (const frr::PathValue &pv : pvs) { if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Failed to remove \"" + pv.path() - + "\""), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to remove \"" + pv.path() + + "\""); } } // No errors, accept all changes. nb_config_replace(candidate->config, candidate_tmp, false); - - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest, - frr::LoadToCandidateResponse> *tag) +grpc::Status HandleUnaryLoadToCandidate( + UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); @@ -765,59 +757,31 @@ void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest, // Request: DataTree config = 3; auto config = tag->request.config(); - struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); struct lyd_node *dnode = dnode_from_data_tree(&config, true); - if (!dnode) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"), - tag); - tag->state = FINISH; - return; - } + if (!dnode) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"); struct nb_config *loaded_config = nb_config_new(dnode); - if (load_type == frr::LoadToCandidateRequest::REPLACE) nb_config_replace(candidate->config, loaded_config, false); - else if (nb_config_merge(candidate->config, loaded_config, false) - != NB_OK) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"), - tag); - tag->state = FINISH; - return; - } + else if (nb_config_merge(candidate->config, loaded_config, false) != + NB_OK) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryCommit( - NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag) +grpc::Status +HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Request: uint32 candidate_id = 1; uint32_t candidate_id = tag->request.candidate_id(); @@ -831,15 +795,9 @@ void HandleUnaryCommit( // Find candidate configuration. struct candidate *candidate = tag->cdb->get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } + if (!candidate) + return grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"); int ret = NB_OK; uint32_t transaction_id = 0; @@ -848,29 +806,17 @@ void HandleUnaryCommit( switch (phase) { case frr::CommitRequest::PREPARE: case frr::CommitRequest::ALL: - if (candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - tag->state = FINISH; - return; - } + if (candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"); break; case frr::CommitRequest::ABORT: case frr::CommitRequest::APPLY: - if (!candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "no transaction in progress"), - tag); - tag->state = FINISH; - return; - } + if (!candidate->transaction) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "no transaction in progress"); break; default: break; @@ -950,53 +896,30 @@ void HandleUnaryCommit( if (strlen(errmsg) > 0) tag->response.set_error_message(errmsg); - tag->responder.Finish(tag->response, status, tag); - tag->state = FINISH; + return status; } -void HandleUnaryLockConfig( - NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) +grpc::Status HandleUnaryLockConfig( + UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, - "running configuration is locked already"), - tag); - } else { - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, + "running configuration is locked already"); + return grpc::Status::OK; } -void HandleUnaryUnlockConfig( - NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) +grpc::Status HandleUnaryUnlockConfig( + UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "failed to unlock the running configuration"), - tag); - } else { - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - } - tag->state = FINISH; + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) + return grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "failed to unlock the running configuration"); + return grpc::Status::OK; } static void list_transactions_cb(void *arg, int transaction_id, @@ -1010,45 +933,34 @@ static void list_transactions_cb(void *arg, int transaction_id, std::string(date), std::string(comment))); } -void HandleStreamingListTransactions( - NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse> - *tag) -{ - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - delete static_cast<std::list<std::tuple< - int, std::string, std::string, std::string>> *>( - tag->context); - tag->state = DELETED; - return; - } +// Define the context variable type for this streaming handler +typedef std::list<std::tuple<int, std::string, std::string, std::string>> + ListTransactionsContextType; - if (!tag->context) { - /* Creating, first time called for this RPC */ - auto new_list = - new std::list<std::tuple<int, std::string, std::string, - std::string>>(); - tag->context = new_list; - nb_db_transactions_iterate(list_transactions_cb, tag->context); +bool HandleStreamingListTransactions( + StreamRpcState<frr::ListTransactionsRequest, + frr::ListTransactionsResponse, + ListTransactionsContextType> *tag) +{ + grpc_debug("%s: entered", __func__); - new_list->push_back(std::make_tuple( + auto list = &tag->context; + if (tag->is_initial_process()) { + grpc_debug("%s: initialize streaming state", __func__); + // Fill our context container first time through + nb_db_transactions_iterate(list_transactions_cb, list); + list->push_back(std::make_tuple( 0xFFFF, std::string("fake client"), std::string("fake date"), std::string("fake comment"))); - new_list->push_back( - std::make_tuple(0xFFFE, std::string("fake client2"), - std::string("fake date"), - std::string("fake comment2"))); + list->push_back(std::make_tuple(0xFFFE, + std::string("fake client2"), + std::string("fake date"), + std::string("fake comment2"))); } - auto list = static_cast<std::list< - std::tuple<int, std::string, std::string, std::string>> *>( - tag->context); - if (list->empty()) { tag->async_responder.Finish(grpc::Status::OK, tag); - tag->state = FINISH; - return; + return false; } auto item = list->back(); @@ -1071,22 +983,18 @@ void HandleStreamingListTransactions( if (list->empty()) { tag->async_responder.WriteAndFinish( response, grpc::WriteOptions(), grpc::Status::OK, tag); - tag->state = FINISH; + return false; } else { tag->async_responder.Write(response, tag); - tag->state = MORE; + return true; } } -void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, - frr::GetTransactionResponse> *tag) +grpc::Status HandleUnaryGetTransaction( + UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse> + *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); // Request: uint32 transaction_id = 1; uint32_t transaction_id = tag->request.transaction_id(); @@ -1102,15 +1010,9 @@ void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, // Load configuration from the transactions database. nb_config = nb_db_transaction_load(transaction_id); - if (!nb_config) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Transaction not found"), - tag); - tag->state = FINISH; - return; - } + if (!nb_config) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Transaction not found"); // Response: DataTree config = 1; auto config = tag->response.mutable_config(); @@ -1121,29 +1023,19 @@ void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, encoding2lyd_format(encoding), with_defaults) != 0) { nb_config_free(nb_config); - tag->responder.Finish(tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"); } nb_config_free(nb_config); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } -void HandleUnaryExecute( - NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) +grpc::Status HandleUnaryExecute( + UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) { - grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - - if (tag->state == FINISH) { - tag->state = DELETED; - return; - } + grpc_debug("%s: entered", __func__); struct nb_node *nb_node; struct list *input_list; @@ -1158,26 +1050,14 @@ void HandleUnaryExecute( grpc_debug("%s(path: \"%s\")", __func__, xpath); - if (tag->request.path().empty()) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Data path is empty"), - tag); - tag->state = FINISH; - return; - } + if (tag->request.path().empty()) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Data path is empty"); nb_node = nb_node_find(xpath); - if (!nb_node) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Unknown data path"), - tag); - tag->state = FINISH; - return; - } + if (!nb_node) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Unknown data path"); input_list = yang_data_list_new(); output_list = yang_data_list_new(); @@ -1199,12 +1079,7 @@ void HandleUnaryExecute( list_delete(&input_list); list_delete(&output_list); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"), - tag); - tag->state = FINISH; - return; + return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"); } // Process output parameters. @@ -1219,8 +1094,7 @@ void HandleUnaryExecute( list_delete(&input_list); list_delete(&output_list); - tag->responder.Finish(tag->response, grpc::Status::OK, tag); - tag->state = FINISH; + return grpc::Status::OK; } // ------------------------------------------------------ @@ -1230,18 +1104,19 @@ void HandleUnaryExecute( #define REQUEST_NEWRPC(NAME, cdb) \ do { \ - auto _rpcState = new NewRpcState<frr::NAME##Request, \ - frr::NAME##Response>( \ + auto _rpcState = new UnaryRpcState<frr::NAME##Request, \ + frr::NAME##Response>( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleUnary##NAME, #NAME); \ _rpcState->do_request(&service, cq.get(), true); \ } while (0) -#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ +#define REQUEST_NEWRPC_STREAMING(NAME) \ do { \ - auto _rpcState = new NewRpcState<frr::NAME##Request, \ - frr::NAME##Response>( \ - (cdb), &frr::Northbound::AsyncService::Request##NAME, \ + auto _rpcState = new StreamRpcState<frr::NAME##Request, \ + frr::NAME##Response, \ + NAME##ContextType>( \ + &frr::Northbound::AsyncService::Request##NAME, \ &HandleStreaming##NAME, #NAME); \ _rpcState->do_request(&service, cq.get(), true); \ } while (0) @@ -1280,7 +1155,7 @@ static void *grpc_pthread_start(void *arg) grpc_running = true; - /* Schedule all RPC handlers */ + /* Schedule unary RPC handlers */ REQUEST_NEWRPC(GetCapabilities, NULL); REQUEST_NEWRPC(CreateCandidate, &candidates); REQUEST_NEWRPC(DeleteCandidate, &candidates); @@ -1292,8 +1167,10 @@ static void *grpc_pthread_start(void *arg) REQUEST_NEWRPC(LockConfig, NULL); REQUEST_NEWRPC(UnlockConfig, NULL); REQUEST_NEWRPC(Execute, NULL); - REQUEST_NEWRPC_STREAMING(Get, NULL); - REQUEST_NEWRPC_STREAMING(ListTransactions, NULL); + + /* Schedule streaming RPC handlers */ + REQUEST_NEWRPC_STREAMING(Get); + REQUEST_NEWRPC_STREAMING(ListTransactions); zlog_notice("gRPC server listening on %s", server_address.str().c_str()); @@ -1301,7 +1178,7 @@ static void *grpc_pthread_start(void *arg) /* Process inbound RPCs */ bool ok; void *tag; - while (grpc_running) { + while (true) { if (!cq->Next(&tag, &ok)) { grpc_debug("%s: CQ empty exiting", __func__); break; @@ -1310,25 +1187,18 @@ static void *grpc_pthread_start(void *arg) grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, ok); - if (!ok || !grpc_running) { + if (!ok) { delete static_cast<RpcStateBase *>(tag); break; } RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); - CallState state = rpc->doCallback(); - grpc_debug("%s: callback returned RPC State: %s", __func__, - call_states[state]); - - /* - * Our side is done (FINISH) receive new requests of this type - * We could do this earlier but that would mean we could be - * handling multiple same type requests in parallel. We expect - * to be called back once more in the FINISH state (from the - * user indicating Finish() for cleanup. - */ - if (state == FINISH && grpc_running) - rpc->do_request(&service, cq.get(), false); + if (rpc->get_state() != FINISH) + rpc->run(&service, cq.get()); + else { + grpc_debug("%s RPC FINISH -> [delete]", rpc->name); + delete rpc; + } } /* This was probably done for us to get here, but let's be safe */ @@ -1400,6 +1270,11 @@ static int frr_grpc_finish(void) grpc_debug("%s: joining and destroy grpc thread", __func__); pthread_join(fpt->thread, NULL); frr_pthread_destroy(fpt); + + // Fix protobuf 'memory leaks' during shutdown. + // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs + google::protobuf::ShutdownProtobufLibrary(); + return 0; } diff --git a/lib/prefix.c b/lib/prefix.c index 90ab48a13b..89c5be8f38 100644 --- a/lib/prefix.c +++ b/lib/prefix.c @@ -1071,6 +1071,26 @@ const char *prefix2str(union prefixconstptr pu, char *str, int size) return str; } +static ssize_t prefixhost2str(struct fbuf *fbuf, union prefixconstptr pu) +{ + const struct prefix *p = pu.p; + char buf[PREFIX2STR_BUFFER]; + + switch (p->family) { + case AF_INET: + case AF_INET6: + inet_ntop(p->family, &p->u.prefix, buf, sizeof(buf)); + return bputs(fbuf, buf); + + case AF_ETHERNET: + prefix_mac2str(&p->u.prefix_eth, buf, sizeof(buf)); + return bputs(fbuf, buf); + + default: + return bprintfrr(fbuf, "{prefix.af=%dPF}", p->family); + } +} + void prefix_mcast_inet4_dump(const char *onfail, struct in_addr addr, char *buf, int buf_size) { @@ -1458,13 +1478,24 @@ printfrr_ext_autoreg_p("FX", printfrr_pfx); static ssize_t printfrr_pfx(struct fbuf *buf, struct printfrr_eargs *ea, const void *ptr) { - char cbuf[PREFIX_STRLEN]; + bool host_only = false; + + if (ea->fmt[0] == 'h') { + ea->fmt++; + host_only = true; + } if (!ptr) return bputs(buf, "(null)"); - prefix2str(ptr, cbuf, sizeof(cbuf)); - return bputs(buf, cbuf); + if (host_only) + return prefixhost2str(buf, (struct prefix *)ptr); + else { + char cbuf[PREFIX_STRLEN]; + + prefix2str(ptr, cbuf, sizeof(cbuf)); + return bputs(buf, cbuf); + } } printfrr_ext_autoreg_p("PSG4", printfrr_psg); diff --git a/lib/routemap.c b/lib/routemap.c index 9afe18d10b..46161fd817 100644 --- a/lib/routemap.c +++ b/lib/routemap.c @@ -100,6 +100,7 @@ static void route_map_del_plist_entries(afi_t afi, struct prefix_list_entry *entry); static struct hash *route_map_get_dep_hash(route_map_event_t event); +static void route_map_free_map(struct route_map *map); struct route_map_match_set_hooks rmap_match_set_hook; @@ -566,15 +567,8 @@ static bool route_map_hash_cmp(const void *p1, const void *p2) const struct route_map *map1 = p1; const struct route_map *map2 = p2; - if (map1->deleted == map2->deleted) { - if (map1->name && map2->name) { - if (!strcmp(map1->name, map2->name)) { - return true; - } - } else if (!map1->name && !map2->name) { - return true; - } - } + if (!strcmp(map1->name, map2->name)) + return true; return false; } @@ -636,13 +630,25 @@ static struct route_map *route_map_new(const char *name) /* Add new name to route_map. */ static struct route_map *route_map_add(const char *name) { - struct route_map *map; + struct route_map *map, *exist; struct route_map_list *list; map = route_map_new(name); list = &route_map_master; - /* Add map to the hash */ + /* + * Add map to the hash + * + * If the map already exists in the hash, then we know that + * FRR is now in a sequence of delete/create. + * All FRR needs to do here is set the to_be_processed + * bit (to inherit from the old one + */ + exist = hash_release(route_map_master_hash, map); + if (exist) { + map->to_be_processed = exist->to_be_processed; + route_map_free_map(exist); + } hash_get(route_map_master_hash, map, hash_alloc_intern); /* Add new entry to the head of the list to match how it is added in the @@ -752,11 +758,15 @@ struct route_map *route_map_lookup_by_name(const char *name) if (!name) return NULL; - // map.deleted is 0 via memset + // map.deleted is false via memset memset(&tmp_map, 0, sizeof(struct route_map)); tmp_map.name = XSTRDUP(MTYPE_ROUTE_MAP_NAME, name); map = hash_lookup(route_map_master_hash, &tmp_map); XFREE(MTYPE_ROUTE_MAP_NAME, tmp_map.name); + + if (map && map->deleted) + return NULL; + return map; } diff --git a/lib/sockunion.c b/lib/sockunion.c index 006ac142aa..9763b38e28 100644 --- a/lib/sockunion.c +++ b/lib/sockunion.c @@ -351,21 +351,6 @@ int sockopt_ttl(int family, int sock, int ttl) return 0; } -/* - * This function called setsockopt(.., TCP_CORK,...) - * Which on linux is a no-op since it is enabled by - * default and on BSD it uses TCP_NOPUSH to do - * the same thing( which it was not configured to - * use). This cleanup of the api occurred on 8/1/17 - * I imagine if after more than 1 year of no-one - * complaining, and a major upgrade release we - * can deprecate and remove this function call - */ -int sockopt_cork(int sock, int onoff) -{ - return 0; -} - int sockopt_minttl(int family, int sock, int minttl) { #ifdef IP_MINTTL diff --git a/lib/sockunion.h b/lib/sockunion.h index 9e6719ccf9..8ace3e4781 100644 --- a/lib/sockunion.h +++ b/lib/sockunion.h @@ -95,7 +95,6 @@ extern int sockunion_bind(int sock, union sockunion *, unsigned short, union sockunion *); extern int sockopt_ttl(int family, int sock, int ttl); extern int sockopt_minttl(int family, int sock, int minttl); -extern int sockopt_cork(int sock, int onoff); extern int sockunion_socket(const union sockunion *su); extern const char *inet_sutop(const union sockunion *su, char *str); extern enum connect_result sockunion_connect(int fd, const union sockunion *su, diff --git a/lib/typerb.c b/lib/typerb.c index e1346df191..fe142ff354 100644 --- a/lib/typerb.c +++ b/lib/typerb.c @@ -468,6 +468,28 @@ struct rb_entry *typed_rb_next(const struct rb_entry *rbe_const) return rbe; } +struct rb_entry *typed_rb_prev(const struct rb_entry *rbe_const) +{ + struct rb_entry *rbe = (struct rb_entry *)rbe_const; + + if (RBE_LEFT(rbe)) { + rbe = RBE_LEFT(rbe); + while (RBE_RIGHT(rbe)) + rbe = RBE_RIGHT(rbe); + } else { + if (RBE_PARENT(rbe) && (rbe == RBE_RIGHT(RBE_PARENT(rbe)))) + rbe = RBE_PARENT(rbe); + else { + while (RBE_PARENT(rbe) + && (rbe == RBE_LEFT(RBE_PARENT(rbe)))) + rbe = RBE_PARENT(rbe); + rbe = RBE_PARENT(rbe); + } + } + + return rbe; +} + struct rb_entry *typed_rb_min(const struct rbt_tree *rbt) { struct rb_entry *rbe = RBH_ROOT(rbt); @@ -481,6 +503,19 @@ struct rb_entry *typed_rb_min(const struct rbt_tree *rbt) return parent; } +struct rb_entry *typed_rb_max(const struct rbt_tree *rbt) +{ + struct rb_entry *rbe = RBH_ROOT(rbt); + struct rb_entry *parent = NULL; + + while (rbe != NULL) { + parent = rbe; + rbe = RBE_RIGHT(rbe); + } + + return parent; +} + bool typed_rb_member(const struct typed_rb_root *rbt, const struct typed_rb_entry *rbe) { diff --git a/lib/typerb.h b/lib/typerb.h index 75a1de77b3..8ac1821742 100644 --- a/lib/typerb.h +++ b/lib/typerb.h @@ -62,6 +62,8 @@ const struct typed_rb_entry *typed_rb_find_lt(const struct typed_rb_root *rbt, const struct typed_rb_entry *a, const struct typed_rb_entry *b)); struct typed_rb_entry *typed_rb_min(const struct typed_rb_root *rbt); +struct typed_rb_entry *typed_rb_max(const struct typed_rb_root *rbt); +struct typed_rb_entry *typed_rb_prev(const struct typed_rb_entry *rbe); struct typed_rb_entry *typed_rb_next(const struct typed_rb_entry *rbe); bool typed_rb_member(const struct typed_rb_root *rbt, const struct typed_rb_entry *rbe); @@ -135,12 +137,32 @@ macro_pure const type *prefix ## _const_next(const struct prefix##_head *h, \ return container_of_null(re, type, field.re); \ } \ TYPESAFE_FIRST_NEXT(prefix, type) \ +macro_pure const type *prefix ## _const_last(const struct prefix##_head *h) \ +{ \ + const struct typed_rb_entry *re; \ + re = typed_rb_max(&h->rr); \ + return container_of_null(re, type, field.re); \ +} \ +macro_pure const type *prefix ## _const_prev(const struct prefix##_head *h, \ + const type *item) \ +{ \ + const struct typed_rb_entry *re; \ + re = typed_rb_prev(&item->field.re); \ + return container_of_null(re, type, field.re); \ +} \ +TYPESAFE_LAST_PREV(prefix, type) \ macro_pure type *prefix ## _next_safe(struct prefix##_head *h, type *item) \ { \ struct typed_rb_entry *re; \ re = item ? typed_rb_next(&item->field.re) : NULL; \ return container_of_null(re, type, field.re); \ } \ +macro_pure type *prefix ## _prev_safe(struct prefix##_head *h, type *item) \ +{ \ + struct typed_rb_entry *re; \ + re = item ? typed_rb_prev(&item->field.re) : NULL; \ + return container_of_null(re, type, field.re); \ +} \ macro_pure size_t prefix ## _count(const struct prefix##_head *h) \ { \ return h->rr.count; \ diff --git a/lib/typesafe.h b/lib/typesafe.h index b284397d98..06fdc52e78 100644 --- a/lib/typesafe.h +++ b/lib/typesafe.h @@ -43,6 +43,22 @@ extern "C" { item; \ item = from, from = prefix##_next_safe(head, from)) +/* reverse direction, only supported by a few containers */ + +#define frr_rev_each(prefix, head, item) \ + for (item = prefix##_last(head); item; \ + item = prefix##_prev(head, item)) +#define frr_rev_each_safe(prefix, head, item) \ + for (typeof(prefix##_prev_safe(head, NULL)) prefix##_safe = \ + prefix##_prev_safe(head, \ + (item = prefix##_last(head))); \ + item; \ + item = prefix##_safe, \ + prefix##_safe = prefix##_prev_safe(head, prefix##_safe)) +#define frr_rev_each_from(prefix, head, item, from) \ + for (item = from, from = prefix##_prev_safe(head, item); \ + item; \ + item = from, from = prefix##_prev_safe(head, from)) /* non-const variants. these wrappers are the same for all the types, so * bundle them together here. @@ -57,6 +73,16 @@ macro_pure type *prefix ## _next(struct prefix##_head *h, type *item) \ return (type *)prefix ## _const_next(h, item); \ } \ /* ... */ +#define TYPESAFE_LAST_PREV(prefix, type) \ +macro_pure type *prefix ## _last(struct prefix##_head *h) \ +{ \ + return (type *)prefix ## _const_last(h); \ +} \ +macro_pure type *prefix ## _prev(struct prefix##_head *h, type *item) \ +{ \ + return (type *)prefix ## _const_prev(h, item); \ +} \ +/* ... */ #define TYPESAFE_FIND(prefix, type) \ macro_inline type *prefix ## _find(struct prefix##_head *h, \ const type *item) \ @@ -398,12 +424,34 @@ macro_pure const type *prefix ## _const_next(const struct prefix##_head *h, \ return container_of(ditem->next, type, field.di); \ } \ TYPESAFE_FIRST_NEXT(prefix, type) \ +macro_pure const type *prefix ## _const_last(const struct prefix##_head *h) \ +{ \ + const struct dlist_item *ditem = h->dh.hitem.prev; \ + if (ditem == &h->dh.hitem) \ + return NULL; \ + return container_of(ditem, type, field.di); \ +} \ +macro_pure const type *prefix ## _const_prev(const struct prefix##_head *h, \ + const type *item) \ +{ \ + const struct dlist_item *ditem = &item->field.di; \ + if (ditem->prev == &h->dh.hitem) \ + return NULL; \ + return container_of(ditem->prev, type, field.di); \ +} \ +TYPESAFE_LAST_PREV(prefix, type) \ macro_pure type *prefix ## _next_safe(struct prefix##_head *h, type *item) \ { \ if (!item) \ return NULL; \ return prefix ## _next(h, item); \ } \ +macro_pure type *prefix ## _prev_safe(struct prefix##_head *h, type *item) \ +{ \ + if (!item) \ + return NULL; \ + return prefix ## _prev(h, item); \ +} \ macro_pure size_t prefix ## _count(const struct prefix##_head *h) \ { \ return h->dh.count; \ diff --git a/lib/wheel.c b/lib/wheel.c index 463410bea4..cdf738a137 100644 --- a/lib/wheel.c +++ b/lib/wheel.c @@ -40,7 +40,6 @@ static void wheel_timer_thread_helper(struct thread *t) void *data; wheel = THREAD_ARG(t); - THREAD_OFF(wheel->timer); wheel->curr_slot += wheel->slots_to_skip; diff --git a/lib/zclient.c b/lib/zclient.c index 930adf6a7a..f6c5a8af08 100644 --- a/lib/zclient.c +++ b/lib/zclient.c @@ -1924,7 +1924,8 @@ const char *zapi_nexthop2str(const struct zapi_nexthop *znh, char *buf, /* * Decode the nexthop-tracking update message */ -bool zapi_nexthop_update_decode(struct stream *s, struct zapi_route *nhr) +bool zapi_nexthop_update_decode(struct stream *s, struct prefix *match, + struct zapi_route *nhr) { uint32_t i; @@ -1932,6 +1933,22 @@ bool zapi_nexthop_update_decode(struct stream *s, struct zapi_route *nhr) STREAM_GETL(s, nhr->message); STREAM_GETW(s, nhr->safi); + STREAM_GETW(s, match->family); + STREAM_GETC(s, match->prefixlen); + /* + * What we got told to match against + */ + switch (match->family) { + case AF_INET: + STREAM_GET(&match->u.prefix4.s_addr, s, IPV4_MAX_BYTELEN); + break; + case AF_INET6: + STREAM_GET(&match->u.prefix6, s, IPV6_MAX_BYTELEN); + break; + } + /* + * What we matched against + */ STREAM_GETW(s, nhr->prefix.family); STREAM_GETC(s, nhr->prefix.prefixlen); switch (nhr->prefix.family) { diff --git a/lib/zclient.h b/lib/zclient.h index ca62b1afeb..092754f602 100644 --- a/lib/zclient.h +++ b/lib/zclient.h @@ -1111,7 +1111,17 @@ int zapi_nexthop_from_nexthop(struct zapi_nexthop *znh, const struct nexthop *nh); int zapi_backup_nexthop_from_nexthop(struct zapi_nexthop *znh, const struct nexthop *nh); -extern bool zapi_nexthop_update_decode(struct stream *s, +/* + * match -> is the prefix that the calling daemon asked to be matched + * against. + * nhr->prefix -> is the actual prefix that was matched against in the + * rib itself. + * + * This distinction is made because a LPM can be made if there is a + * covering route. This way the upper level protocol can make a decision + * point about whether or not it wants to use the match or not. + */ +extern bool zapi_nexthop_update_decode(struct stream *s, struct prefix *match, struct zapi_route *nhr); const char *zapi_nexthop2str(const struct zapi_nexthop *znh, char *buf, int bufsize); |
