summaryrefslogtreecommitdiff
path: root/lib/frr_zmq.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/frr_zmq.c')
-rw-r--r--lib/frr_zmq.c32
1 files changed, 18 insertions, 14 deletions
diff --git a/lib/frr_zmq.c b/lib/frr_zmq.c
index 05f0fce5fc..ce52848a25 100644
--- a/lib/frr_zmq.c
+++ b/lib/frr_zmq.c
@@ -135,8 +135,8 @@ static int frrzmq_read_msg(struct thread *t)
if (read)
frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
- _thread_add_read_write(t->xref, t->master, frrzmq_read_msg, cbp,
- cb->fd, &cb->read.thread);
+ thread_add_read(t->master, frrzmq_read_msg, cbp,
+ cb->fd, &cb->read.thread);
return 0;
out_err:
@@ -191,11 +191,11 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
if (events & ZMQ_POLLIN) {
thread_cancel(&cb->read.thread);
- _thread_add_event(xref, master, frrzmq_read_msg, cbp, fd,
+ thread_add_event(master, frrzmq_read_msg, cbp, fd,
&cb->read.thread);
} else
- _thread_add_read_write(xref, master, frrzmq_read_msg, cbp, fd,
- &cb->read.thread);
+ thread_add_read(master, frrzmq_read_msg, cbp, fd,
+ &cb->read.thread);
return 0;
}
@@ -241,8 +241,8 @@ static int frrzmq_write_msg(struct thread *t)
if (written)
frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
- _thread_add_read_write(t->xref, t->master, frrzmq_write_msg, cbp,
- cb->fd, &cb->write.thread);
+ thread_add_write(t->master, frrzmq_write_msg, cbp,
+ cb->fd, &cb->write.thread);
return 0;
out_err:
@@ -297,8 +297,8 @@ int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
_thread_add_event(xref, master, frrzmq_write_msg, cbp, fd,
&cb->write.thread);
} else
- _thread_add_read_write(xref, master, frrzmq_write_msg, cbp, fd,
- &cb->write.thread);
+ thread_add_write(master, frrzmq_write_msg, cbp, fd,
+ &cb->write.thread);
return 0;
}
@@ -310,7 +310,7 @@ void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
thread_cancel(&core->thread);
if ((*cb)->read.cancelled && !(*cb)->read.thread
- && (*cb)->write.cancelled && (*cb)->write.thread)
+ && (*cb)->write.cancelled && !(*cb)->write.thread)
XFREE(MTYPE_ZEROMQ_CB, *cb);
}
@@ -330,12 +330,16 @@ void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
len = sizeof(events);
if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
return;
- if (events & event && core->thread && !core->cancelled) {
+ if ((events & event) && core->thread && !core->cancelled) {
struct thread_master *tm = core->thread->master;
+
thread_cancel(&core->thread);
- thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
- : frrzmq_write_msg),
- cbp, cb->fd, &core->thread);
+ if (event == ZMQ_POLLIN)
+ thread_add_event(tm, frrzmq_read_msg,
+ cbp, cb->fd, &core->thread);
+ else
+ thread_add_event(tm, frrzmq_write_msg,
+ cbp, cb->fd, &core->thread);
}
}