thread_cancel(&core->thread);
if ((*cb)->read.cancelled && !(*cb)->read.thread
- && (*cb)->write.cancelled && (*cb)->write.thread)
+ && (*cb)->write.cancelled && !(*cb)->write.thread)
XFREE(MTYPE_ZEROMQ_CB, *cb);
}
len = sizeof(events);
if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
return;
- if (events & event && core->thread && !core->cancelled) {
+ if ((events & event) && core->thread && !core->cancelled) {
struct thread_master *tm = core->thread->master;
+
thread_cancel(&core->thread);
- thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
- : frrzmq_write_msg),
- cbp, cb->fd, &core->thread);
+ if (event == ZMQ_POLLIN)
+ thread_add_event(tm, frrzmq_read_msg,
+ cbp, cb->fd, &core->thread);
+ else
+ thread_add_event(tm, frrzmq_write_msg,
+ cbp, cb->fd, &core->thread);
}
}