summaryrefslogtreecommitdiff
path: root/lib/northbound_grpc.cpp
diff options
context:
space:
mode:
authorMark Stapp <mjs@voltanet.io>2021-06-07 13:55:11 -0400
committerMark Stapp <mjs@voltanet.io>2021-06-15 11:08:19 -0400
commitde800c10afa3ca5233774d414d64344bf04a3734 (patch)
treee2aa5d7d03d7337b07f7e13ec497460ad579eb26 /lib/northbound_grpc.cpp
parent455d14ae317c461febc02d3731bf8691c2f2ee5f (diff)
lib: cleanup and stop grpc pthread
At shutdown, try to stop the grpc module and its dedicated pthread cleanly. Signed-off-by: Mark Stapp <mjs@voltanet.io>
Diffstat (limited to 'lib/northbound_grpc.cpp')
-rw-r--r--lib/northbound_grpc.cpp42
1 files changed, 30 insertions, 12 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp
index 807d1252c4..71f07dfe86 100644
--- a/lib/northbound_grpc.cpp
+++ b/lib/northbound_grpc.cpp
@@ -1222,7 +1222,7 @@ void HandleUnaryExecute(
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleUnary##NAME, #NAME); \
- _rpcState->do_request(service, _cq); \
+ _rpcState->do_request(service, s_cq); \
} while (0)
#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
@@ -1231,7 +1231,7 @@ void HandleUnaryExecute(
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleStreaming##NAME, #NAME); \
- _rpcState->do_request(service, _cq); \
+ _rpcState->do_request(service, s_cq); \
} while (0)
struct grpc_pthread_attr {
@@ -1239,6 +1239,10 @@ struct grpc_pthread_attr {
unsigned long port;
};
+// Capture these objects so we can try to shut down cleanly
+static std::unique_ptr<grpc::Server> s_server;
+static grpc::ServerCompletionQueue *s_cq;
+
static void *grpc_pthread_start(void *arg)
{
struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
@@ -1249,7 +1253,6 @@ static void *grpc_pthread_start(void *arg)
std::stringstream server_address;
frr::Northbound::AsyncService *service =
new frr::Northbound::AsyncService();
- grpc::ServerCompletionQueue *_cq;
frr_pthread_set_name(fpt);
@@ -1258,8 +1261,8 @@ static void *grpc_pthread_start(void *arg)
grpc::InsecureServerCredentials());
builder.RegisterService(service);
auto cq = builder.AddCompletionQueue();
- _cq = cq.get();
- auto server = builder.BuildAndStart();
+ s_cq = cq.get();
+ s_server = builder.BuildAndStart();
/* Schedule all RPC handlers */
REQUEST_NEWRPC(GetCapabilities, NULL);
@@ -1284,10 +1287,12 @@ static void *grpc_pthread_start(void *arg)
void *tag;
bool ok;
- _cq->Next(&tag, &ok);
+ s_cq->Next(&tag, &ok);
+ if (!ok)
+ break;
+
grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
tag, ok);
- GPR_ASSERT(ok);
RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
CallState state = rpc->doCallback();
@@ -1302,10 +1307,9 @@ static void *grpc_pthread_start(void *arg)
* user indicating Finish() for cleanup.
*/
if (state == FINISH)
- rpc->do_request(service, _cq);
+ rpc->do_request(service, s_cq);
}
- /*NOTREACHED*/
return NULL;
}
@@ -1326,16 +1330,30 @@ static int frr_grpc_init(uint port)
__func__, safe_strerror(errno));
return -1;
}
- pthread_detach(fpt->thread);
return 0;
}
static int frr_grpc_finish(void)
{
- if (fpt)
+ // Shutdown the grpc server
+ if (s_server) {
+ s_server->Shutdown();
+ s_cq->Shutdown();
+
+ // And drain the queue
+ void *ignore;
+ bool ok;
+
+ while (s_cq->Next(&ignore, &ok))
+ ;
+ }
+
+ if (fpt) {
+ pthread_join(fpt->thread, NULL);
frr_pthread_destroy(fpt);
- // TODO: cancel the gRPC pthreads gracefully.
+ }
+
return 0;
}