diff options
| author | Donatas Abraitis <donatas@opensourcerouting.org> | 2022-03-08 09:22:07 +0200 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-03-08 09:22:07 +0200 | 
| commit | fe894829f909c0f0b602f36c8442c40956f5861f (patch) | |
| tree | aff7a257105eb3f17f3f3018d377b6ef3695a1f7 | |
| parent | 47a5de9c038c17038cef61786fd1216d33ee0f66 (diff) | |
| parent | 5547b4a3293008ea48d179dc1f8590b5a3fcfc83 (diff) | |
Merge pull request #10752 from FRRouting/mergify/bp/stable/8.2/pr-10741
critical fixes for grpc (backport #10741)
| -rw-r--r-- | lib/northbound_grpc.cpp | 142 | 
1 files changed, 92 insertions, 50 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index e227d0385c..f5c2a91a50 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -1,7 +1,7 @@  // +// Copyright (c) 2021-2022, LabN Consulting, L.L.C  // Copyright (C) 2019  NetDEF, Inc.  //                     Renato Westphal -// Copyright (c) 2021, LabN Consulting, L.L.C  //  // This program is free software; you can redistribute it and/or modify it  // under the terms of the GNU General Public License as published by the Free @@ -50,6 +50,8 @@ static struct thread_master *main_master;  static struct frr_pthread *fpt; +static bool grpc_running; +  #define grpc_debug(...)                                                        \  	do {                                                                   \  		if (nb_dbg_client_grpc)                                        \ @@ -96,11 +98,11 @@ class Candidates  	{  		char errmsg[BUFSIZ] = {0}; -		_cdb.erase(c->id);  		nb_config_free(c->config);  		if (c->transaction)  			nb_candidate_commit_abort(c->transaction, errmsg,  						  sizeof(errmsg)); +		_cdb.erase(c->id);  	}  	struct candidate *get_candidate(uint32_t id) @@ -116,9 +118,11 @@ class Candidates  class RpcStateBase  {        public: +	virtual ~RpcStateBase() = default;  	virtual CallState doCallback() = 0;  	virtual void do_request(::frr::Northbound::AsyncService *service, -				::grpc::ServerCompletionQueue *cq) = 0; +				::grpc::ServerCompletionQueue *cq, +				bool no_copy) = 0;  };  /* @@ -188,17 +192,22 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase  	}  	void do_request(::frr::Northbound::AsyncService *service, -			::grpc::ServerCompletionQueue *cq) override +			::grpc::ServerCompletionQueue *cq, +			bool no_copy) override  	{  		grpc_debug("%s, posting a request for: %s", __func__, name);  		if (requestf) {  			NewRpcState<Q, S> *copy = -				new NewRpcState(cdb, requestf, callback, name); +				no_copy ? this +					: new NewRpcState(cdb, requestf, +							  callback, name);  			(service->*requestf)(©->ctx, ©->request,  					     ©->responder, cq, cq, copy);  		} else {  			NewRpcState<Q, S> *copy = -				new NewRpcState(cdb, requestsf, callback, name); +				no_copy ? this +					: new NewRpcState(cdb, requestsf, +							  callback, name);  			(service->*requestsf)(©->ctx, ©->request,  					      ©->async_responder, cq, cq,  					      copy); @@ -227,7 +236,6 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase  		pthread_mutex_unlock(&_tag->cmux);  		return 0;  	} -	NewRpcState<Q, S> *orig;  	const char *name;  	grpc::ServerContext ctx; @@ -238,12 +246,12 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase  	Candidates *cdb;  	void (*callback)(NewRpcState<Q, S> *); -	reqfunc_t requestf; -	reqsfunc_t requestsf; +	reqfunc_t requestf = NULL; +	reqsfunc_t requestsf = NULL;  	pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;  	pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -	void *context; +	void *context = 0;  	CallState state = CREATE;  }; @@ -268,10 +276,10 @@ static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)  }  static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, -			   const std::string &value) +			   const char *value)  { -	LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), -				  value.c_str(), LYD_NEW_PATH_UPDATE, &dnode); +	LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value, +				  LYD_NEW_PATH_UPDATE, &dnode);  	if (err != LY_SUCCESS) {  		flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",  			  __func__, ly_errmsg(ly_native_ctx)); @@ -698,8 +706,8 @@ void HandleUnaryEditCandidate(  	auto pvs = tag->request.update();  	for (const frr::PathValue &pv : pvs) { -		if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value()) -		    != 0) { +		if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), +				    pv.value().c_str()) != 0) {  			nb_config_free(candidate_tmp);  			tag->responder.Finish( @@ -1226,7 +1234,7 @@ void HandleUnaryExecute(  						 frr::NAME##Response>(         \  			(cdb), &frr::Northbound::AsyncService::Request##NAME,  \  			&HandleUnary##NAME, #NAME);                            \ -		_rpcState->do_request(service, s_cq);                          \ +		_rpcState->do_request(&service, cq.get(), true);               \  	} while (0)  #define REQUEST_NEWRPC_STREAMING(NAME, cdb)                                    \ @@ -1235,7 +1243,7 @@ void HandleUnaryExecute(  						 frr::NAME##Response>(         \  			(cdb), &frr::Northbound::AsyncService::Request##NAME,  \  			&HandleStreaming##NAME, #NAME);                        \ -		_rpcState->do_request(service, s_cq);                          \ +		_rpcState->do_request(&service, cq.get(), true);               \  	} while (0)  struct grpc_pthread_attr { @@ -1244,8 +1252,8 @@ struct grpc_pthread_attr {  };  // Capture these objects so we can try to shut down cleanly -static std::unique_ptr<grpc::Server> s_server; -static grpc::ServerCompletionQueue *s_cq; +static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER; +static grpc::Server *s_server;  static void *grpc_pthread_start(void *arg)  { @@ -1255,18 +1263,22 @@ static void *grpc_pthread_start(void *arg)  	Candidates candidates;  	grpc::ServerBuilder builder;  	std::stringstream server_address; -	frr::Northbound::AsyncService *service = -		new frr::Northbound::AsyncService(); +	frr::Northbound::AsyncService service;  	frr_pthread_set_name(fpt);  	server_address << "0.0.0.0:" << port;  	builder.AddListeningPort(server_address.str(),  				 grpc::InsecureServerCredentials()); -	builder.RegisterService(service); -	auto cq = builder.AddCompletionQueue(); -	s_cq = cq.get(); -	s_server = builder.BuildAndStart(); +	builder.RegisterService(&service); +	builder.AddChannelArgument( +		GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); +	std::unique_ptr<grpc::ServerCompletionQueue> cq = +		builder.AddCompletionQueue(); +	std::unique_ptr<grpc::Server> server = builder.BuildAndStart(); +	s_server = server.get(); + +	grpc_running = true;  	/* Schedule all RPC handlers */  	REQUEST_NEWRPC(GetCapabilities, NULL); @@ -1287,20 +1299,25 @@ static void *grpc_pthread_start(void *arg)  		    server_address.str().c_str());  	/* Process inbound RPCs */ -	while (true) { -		void *tag; -		bool ok; - -		s_cq->Next(&tag, &ok); -		if (!ok) +	bool ok; +	void *tag; +	while (grpc_running) { +		if (!cq->Next(&tag, &ok)) { +			grpc_debug("%s: CQ empty exiting", __func__);  			break; +		} -		grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__, -			   tag, ok); +		grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, +			   ok); + +		if (!ok || !grpc_running) { +			delete static_cast<RpcStateBase *>(tag); +			break; +		}  		RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);  		CallState state = rpc->doCallback(); -		grpc_debug("%s: Callback returned RPC State: %s", __func__, +		grpc_debug("%s: callback returned RPC State: %s", __func__,  			   call_states[state]);  		/* @@ -1310,10 +1327,30 @@ static void *grpc_pthread_start(void *arg)  		 * to be called back once more in the FINISH state (from the  		 * user indicating Finish() for cleanup.  		 */ -		if (state == FINISH) -			rpc->do_request(service, s_cq); +		if (state == FINISH && grpc_running) +			rpc->do_request(&service, cq.get(), false); +	} + +	/* This was probably done for us to get here, but let's be safe */ +	pthread_mutex_lock(&s_server_lock); +	grpc_running = false; +	if (s_server) { +		grpc_debug("%s: shutdown server and CQ", __func__); +		server->Shutdown(); +		s_server = NULL;  	} +	pthread_mutex_unlock(&s_server_lock); +	grpc_debug("%s: shutting down CQ", __func__); +	cq->Shutdown(); + +	grpc_debug("%s: draining the CQ", __func__); +	while (cq->Next(&tag, &ok)) { +		grpc_debug("%s: drain tag %p", __func__, tag); +		delete static_cast<RpcStateBase *>(tag); +	} + +	zlog_info("%s: exiting from grpc pthread", __func__);  	return NULL;  } @@ -1325,6 +1362,8 @@ static int frr_grpc_init(uint port)  		.stop = NULL,  	}; +	grpc_debug("%s: entered", __func__); +  	fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");  	fpt->data = reinterpret_cast<void *>((intptr_t)port); @@ -1340,24 +1379,27 @@ static int frr_grpc_init(uint port)  static int frr_grpc_finish(void)  { -	// Shutdown the grpc server -	if (s_server) { -		s_server->Shutdown(); -		s_cq->Shutdown(); +	grpc_debug("%s: entered", __func__); -		// And drain the queue -		void *ignore; -		bool ok; - -		while (s_cq->Next(&ignore, &ok)) -			; -	} +	if (!fpt) +		return 0; -	if (fpt) { -		pthread_join(fpt->thread, NULL); -		frr_pthread_destroy(fpt); +	/* +	 * Shut the server down here in main thread. This will cause the wait on +	 * the completion queue (cq.Next()) to exit and cleanup everything else. +	 */ +	pthread_mutex_lock(&s_server_lock); +	grpc_running = false; +	if (s_server) { +		grpc_debug("%s: shutdown server", __func__); +		s_server->Shutdown(); +		s_server = NULL;  	} +	pthread_mutex_unlock(&s_server_lock); +	grpc_debug("%s: joining and destroy grpc thread", __func__); +	pthread_join(fpt->thread, NULL); +	frr_pthread_destroy(fpt);  	return 0;  }  | 
