diff options
| author | Mark Stapp <mjs@voltanet.io> | 2021-06-07 13:55:11 -0400 |
|---|---|---|
| committer | Mark Stapp <mjs@voltanet.io> | 2021-06-15 11:08:19 -0400 |
| commit | de800c10afa3ca5233774d414d64344bf04a3734 (patch) | |
| tree | e2aa5d7d03d7337b07f7e13ec497460ad579eb26 /lib/northbound_grpc.cpp | |
| parent | 455d14ae317c461febc02d3731bf8691c2f2ee5f (diff) | |
lib: cleanup and stop grpc pthread
At shutdown, try to stop the grpc module and its
dedicated pthread cleanly.
Signed-off-by: Mark Stapp <mjs@voltanet.io>
Diffstat (limited to 'lib/northbound_grpc.cpp')
| -rw-r--r-- | lib/northbound_grpc.cpp | 42 |
1 files changed, 30 insertions, 12 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 807d1252c4..71f07dfe86 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -1222,7 +1222,7 @@ void HandleUnaryExecute( frr::NAME##Response>( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleUnary##NAME, #NAME); \ - _rpcState->do_request(service, _cq); \ + _rpcState->do_request(service, s_cq); \ } while (0) #define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ @@ -1231,7 +1231,7 @@ void HandleUnaryExecute( frr::NAME##Response>( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleStreaming##NAME, #NAME); \ - _rpcState->do_request(service, _cq); \ + _rpcState->do_request(service, s_cq); \ } while (0) struct grpc_pthread_attr { @@ -1239,6 +1239,10 @@ struct grpc_pthread_attr { unsigned long port; }; +// Capture these objects so we can try to shut down cleanly +static std::unique_ptr<grpc::Server> s_server; +static grpc::ServerCompletionQueue *s_cq; + static void *grpc_pthread_start(void *arg) { struct frr_pthread *fpt = static_cast<frr_pthread *>(arg); @@ -1249,7 +1253,6 @@ static void *grpc_pthread_start(void *arg) std::stringstream server_address; frr::Northbound::AsyncService *service = new frr::Northbound::AsyncService(); - grpc::ServerCompletionQueue *_cq; frr_pthread_set_name(fpt); @@ -1258,8 +1261,8 @@ static void *grpc_pthread_start(void *arg) grpc::InsecureServerCredentials()); builder.RegisterService(service); auto cq = builder.AddCompletionQueue(); - _cq = cq.get(); - auto server = builder.BuildAndStart(); + s_cq = cq.get(); + s_server = builder.BuildAndStart(); /* Schedule all RPC handlers */ REQUEST_NEWRPC(GetCapabilities, NULL); @@ -1284,10 +1287,12 @@ static void *grpc_pthread_start(void *arg) void *tag; bool ok; - _cq->Next(&tag, &ok); + s_cq->Next(&tag, &ok); + if (!ok) + break; + grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__, tag, ok); - GPR_ASSERT(ok); RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); CallState state = rpc->doCallback(); @@ -1302,10 +1307,9 @@ static void *grpc_pthread_start(void *arg) * user indicating Finish() for cleanup. */ if (state == FINISH) - rpc->do_request(service, _cq); + rpc->do_request(service, s_cq); } - /*NOTREACHED*/ return NULL; } @@ -1326,16 +1330,30 @@ static int frr_grpc_init(uint port) __func__, safe_strerror(errno)); return -1; } - pthread_detach(fpt->thread); return 0; } static int frr_grpc_finish(void) { - if (fpt) + // Shutdown the grpc server + if (s_server) { + s_server->Shutdown(); + s_cq->Shutdown(); + + // And drain the queue + void *ignore; + bool ok; + + while (s_cq->Next(&ignore, &ok)) + ; + } + + if (fpt) { + pthread_join(fpt->thread, NULL); frr_pthread_destroy(fpt); - // TODO: cancel the gRPC pthreads gracefully. + } + return 0; } |
