summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/frr_pthread.c15
-rw-r--r--lib/frr_pthread.h3
-rw-r--r--lib/frrcu.c527
-rw-r--r--lib/frrcu.h172
-rw-r--r--lib/libfrr.c2
-rw-r--r--lib/log.c4
-rw-r--r--lib/seqlock.c190
-rw-r--r--lib/seqlock.h40
-rw-r--r--lib/sigevent.c42
-rw-r--r--lib/subdir.am2
-rw-r--r--lib/thread.c6
11 files changed, 931 insertions, 72 deletions
diff --git a/lib/frr_pthread.c b/lib/frr_pthread.c
index e588571c01..bdb6c2a397 100644
--- a/lib/frr_pthread.c
+++ b/lib/frr_pthread.c
@@ -133,18 +133,29 @@ int frr_pthread_set_name(struct frr_pthread *fpt)
return ret;
}
+static void *frr_pthread_inner(void *arg)
+{
+ struct frr_pthread *fpt = arg;
+
+ rcu_thread_start(fpt->rcu_thread);
+ return fpt->attr.start(fpt);
+}
+
int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr)
{
int ret;
- ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt);
+ fpt->rcu_thread = rcu_thread_prepare();
+ ret = pthread_create(&fpt->thread, attr, frr_pthread_inner, fpt);
/*
* Per pthread_create(3), the contents of fpt->thread are undefined if
* pthread_create() did not succeed. Reset this value to zero.
*/
- if (ret < 0)
+ if (ret < 0) {
+ rcu_thread_unprepare(fpt->rcu_thread);
memset(&fpt->thread, 0x00, sizeof(fpt->thread));
+ }
return ret;
}
diff --git a/lib/frr_pthread.h b/lib/frr_pthread.h
index 3afe7ba966..6096a50370 100644
--- a/lib/frr_pthread.h
+++ b/lib/frr_pthread.h
@@ -23,6 +23,7 @@
#include <pthread.h>
#include "frratomic.h"
#include "memory.h"
+#include "frrcu.h"
#include "thread.h"
#ifdef __cplusplus
@@ -50,6 +51,8 @@ struct frr_pthread {
/* pthread id */
pthread_t thread;
+ struct rcu_thread *rcu_thread;
+
/* thread master for this pthread's thread.c event loop */
struct thread_master *master;
diff --git a/lib/frrcu.c b/lib/frrcu.c
new file mode 100644
index 0000000000..7e6475b648
--- /dev/null
+++ b/lib/frrcu.c
@@ -0,0 +1,527 @@
+/*
+ * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/* implementation notes: this is an epoch-based RCU implementation. rcu_seq
+ * (global variable) counts the current epoch. Threads hold a specific epoch
+ * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
+ * data from.
+ *
+ * The rcu_seq global is only pushed forward on rcu_read_lock() and
+ * rcu_read_unlock() calls. This makes things a tad more efficient since
+ * those are the only places it matters:
+ * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
+ * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
+ * when heading into a long idle period where no thread holds RCU
+ *
+ * rcu_thread structures themselves are RCU-free'd.
+ *
+ * rcu_head structures are the most iffy; normally for an ATOMLIST we would
+ * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
+ * to prevent ABA or use-after-free problems. However, our ATOMLIST code
+ * guarantees that if the list remains non-empty in all cases, we only need
+ * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
+ * issues - but we do need to keep at least 1 item on the list.
+ *
+ * (Search the atomlist code for all uses of "last")
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <pthread.h>
+#ifdef HAVE_PTHREAD_NP_H
+#include <pthread_np.h>
+#endif
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include "frrcu.h"
+#include "seqlock.h"
+#include "atomlist.h"
+
+DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread")
+DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier")
+
+DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head)
+
+PREDECL_ATOMLIST(rcu_threads)
+struct rcu_thread {
+ struct rcu_threads_item head;
+
+ struct rcu_head rcu_head;
+
+ struct seqlock rcu;
+
+ /* only accessed by thread itself, not atomic */
+ unsigned depth;
+};
+DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head)
+
+static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
+static const struct rcu_action rcua_end = { .type = RCUA_END };
+static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
+
+struct rcu_next {
+ struct rcu_head head_free;
+ struct rcu_head head_next;
+};
+
+#define rcu_free_internal(mtype, ptr, field) \
+ do { \
+ typeof(ptr) _ptr = (ptr); \
+ struct rcu_head *_rcu_head = &_ptr->field; \
+ static const struct rcu_action _rcu_action = { \
+ .type = RCUA_FREE, \
+ .u.free = { \
+ .mt = mtype, \
+ .offset = offsetof(typeof(*_ptr), field), \
+ }, \
+ }; \
+ _rcu_head->action = &_rcu_action; \
+ rcu_heads_add_tail(&rcu_heads, _rcu_head); \
+ } while (0)
+
+/* primary global RCU position */
+static struct seqlock rcu_seq;
+/* this is set to rcu_seq whenever something is added on the RCU queue.
+ * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
+ */
+static _Atomic seqlock_val_t rcu_dirty;
+
+static struct rcu_threads_head rcu_threads;
+static struct rcu_heads_head rcu_heads;
+
+/* main thread & RCU sweeper have pre-setup rcu_thread structures. The
+ * reasons are different:
+ *
+ * - rcu_thread_main is there because the main thread isn't started like
+ * other threads, it's implicitly created when the program is started. So
+ * rcu_thread_main matches up implicitly.
+ *
+ * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
+ * sense really), it only exists so we can call RCU-using functions from
+ * the RCU thread without special handling in rcu_read_lock/unlock.
+ */
+static struct rcu_thread rcu_thread_main;
+static struct rcu_thread rcu_thread_rcu;
+
+static pthread_t rcu_pthread;
+static pthread_key_t rcu_thread_key;
+static bool rcu_active;
+
+static void rcu_start(void);
+static void rcu_bump(void);
+
+/*
+ * preinitialization for main thread
+ */
+static void rcu_thread_end(void *rcu_thread);
+
+static void rcu_preinit(void) __attribute__((constructor));
+static void rcu_preinit(void)
+{
+ struct rcu_thread *rt;
+
+ rt = &rcu_thread_main;
+ rt->depth = 1;
+ seqlock_init(&rt->rcu);
+ seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
+
+ pthread_key_create(&rcu_thread_key, rcu_thread_end);
+ pthread_setspecific(rcu_thread_key, rt);
+
+ rcu_threads_add_tail(&rcu_threads, rt);
+
+ /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
+ rt = &rcu_thread_rcu;
+ rt->depth = 1;
+
+ seqlock_init(&rcu_seq);
+ seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
+}
+
+static struct rcu_thread *rcu_self(void)
+{
+ return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
+}
+
+/*
+ * thread management (for the non-main thread)
+ */
+struct rcu_thread *rcu_thread_prepare(void)
+{
+ struct rcu_thread *rt, *cur;
+
+ rcu_assert_read_locked();
+
+ if (!rcu_active)
+ rcu_start();
+
+ cur = rcu_self();
+ assert(cur->depth);
+
+ /* new thread always starts with rcu_read_lock held at depth 1, and
+ * holding the same epoch as the parent (this makes it possible to
+ * use RCU for things passed into the thread through its arg)
+ */
+ rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
+ rt->depth = 1;
+
+ seqlock_init(&rt->rcu);
+ seqlock_acquire(&rt->rcu, &cur->rcu);
+
+ rcu_threads_add_tail(&rcu_threads, rt);
+
+ return rt;
+}
+
+void rcu_thread_start(struct rcu_thread *rt)
+{
+ pthread_setspecific(rcu_thread_key, rt);
+}
+
+void rcu_thread_unprepare(struct rcu_thread *rt)
+{
+ if (rt == &rcu_thread_rcu)
+ return;
+
+ rt->depth = 1;
+ seqlock_acquire(&rt->rcu, &rcu_seq);
+
+ rcu_bump();
+ if (rt != &rcu_thread_main)
+ /* this free() happens after seqlock_release() below */
+ rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
+
+ rcu_threads_del(&rcu_threads, rt);
+ seqlock_release(&rt->rcu);
+}
+
+static void rcu_thread_end(void *rtvoid)
+{
+ struct rcu_thread *rt = rtvoid;
+ rcu_thread_unprepare(rt);
+}
+
+/*
+ * main RCU control aspects
+ */
+
+static void rcu_bump(void)
+{
+ struct rcu_next *rn;
+
+ rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
+
+ /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
+ * This means we don't need to communicate which seqno is which
+ * RCUA_NEXT, since we really don't care.
+ */
+
+ /*
+ * Important race condition: while rcu_heads_add_tail is executing,
+ * there is an intermediate point where the rcu_heads "last" pointer
+ * already points to rn->head_next, but rn->head_next isn't added to
+ * the list yet. That means any other "add_tail" calls append to this
+ * item, which isn't fully on the list yet. Freeze this thread at
+ * that point and look at another thread doing a rcu_bump. It adds
+ * these two items and then does a seqlock_bump. But the rcu_heads
+ * list is still "interrupted" and there's no RCUA_NEXT on the list
+ * yet (from either the frozen thread or the second thread). So
+ * rcu_main() might actually hit the end of the list at the
+ * "interrupt".
+ *
+ * This situation is prevented by requiring that rcu_read_lock is held
+ * for any calls to rcu_bump, since if we're holding the current RCU
+ * epoch, that means rcu_main can't be chewing on rcu_heads and hit
+ * that interruption point. Only by the time the thread has continued
+ * to rcu_read_unlock() - and therefore completed the add_tail - the
+ * RCU sweeper gobbles up the epoch and can be sure to find at least
+ * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
+ */
+ rn->head_next.action = &rcua_next;
+ rcu_heads_add_tail(&rcu_heads, &rn->head_next);
+
+ /* free rn that we allocated above.
+ *
+ * This is INTENTIONALLY not built into the RCUA_NEXT action. This
+ * ensures that after the action above is popped off the queue, there
+ * is still at least 1 item on the RCU queue. This means we never
+ * delete the last item, which is extremely important since it keeps
+ * the atomlist ->last pointer alive and well.
+ *
+ * If we were to "run dry" on the RCU queue, add_tail may run into the
+ * "last item is being deleted - start over" case, and then we may end
+ * up accessing old RCU queue items that are already free'd.
+ */
+ rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
+
+ /* Only allow the RCU sweeper to run after these 2 items are queued.
+ *
+ * If another thread enqueues some RCU action in the intermediate
+ * window here, nothing bad happens - the queued action is associated
+ * with a larger seq# than strictly necessary. Thus, it might get
+ * executed a bit later, but that's not a problem.
+ *
+ * If another thread acquires the read lock in this window, it holds
+ * the previous epoch, but its RCU queue actions will be in the next
+ * epoch. This isn't a problem either, just a tad inefficient.
+ */
+ seqlock_bump(&rcu_seq);
+}
+
+static void rcu_bump_maybe(void)
+{
+ seqlock_val_t dirty;
+
+ dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
+ /* no problem if we race here and multiple threads bump rcu_seq;
+ * bumping too much causes no issues while not bumping enough will
+ * result in delayed cleanup
+ */
+ if (dirty == seqlock_cur(&rcu_seq))
+ rcu_bump();
+}
+
+void rcu_read_lock(void)
+{
+ struct rcu_thread *rt = rcu_self();
+
+ assert(rt);
+ if (rt->depth++ > 0)
+ return;
+
+ seqlock_acquire(&rt->rcu, &rcu_seq);
+ /* need to hold RCU for bump ... */
+ rcu_bump_maybe();
+ /* ... but no point in holding the old epoch if we just bumped */
+ seqlock_acquire(&rt->rcu, &rcu_seq);
+}
+
+void rcu_read_unlock(void)
+{
+ struct rcu_thread *rt = rcu_self();
+
+ assert(rt && rt->depth);
+ if (--rt->depth > 0)
+ return;
+ rcu_bump_maybe();
+ seqlock_release(&rt->rcu);
+}
+
+void rcu_assert_read_locked(void)
+{
+ struct rcu_thread *rt = rcu_self();
+ assert(rt && rt->depth && seqlock_held(&rt->rcu));
+}
+
+void rcu_assert_read_unlocked(void)
+{
+ struct rcu_thread *rt = rcu_self();
+ assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
+}
+
+/*
+ * RCU resource-release thread
+ */
+
+static void *rcu_main(void *arg);
+
+static void rcu_start(void)
+{
+ /* ensure we never handle signals on the RCU thread by blocking
+ * everything here (new thread inherits signal mask)
+ */
+ sigset_t oldsigs, blocksigs;
+
+ sigfillset(&blocksigs);
+ pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
+
+ rcu_active = true;
+
+ assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
+
+ pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
+
+#ifdef HAVE_PTHREAD_SETNAME_NP
+# ifdef GNU_LINUX
+ pthread_setname_np(rcu_pthread, "RCU sweeper");
+# elif defined(__NetBSD__)
+ pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
+# endif
+#elif defined(HAVE_PTHREAD_SET_NAME_NP)
+ pthread_set_name_np(rcu_pthread, "RCU sweeper");
+#endif
+}
+
+static void rcu_do(struct rcu_head *rh)
+{
+ struct rcu_head_close *rhc;
+ void *p;
+
+ switch (rh->action->type) {
+ case RCUA_FREE:
+ p = (char *)rh - rh->action->u.free.offset;
+ if (rh->action->u.free.mt)
+ qfree(rh->action->u.free.mt, p);
+ else
+ free(p);
+ break;
+ case RCUA_CLOSE:
+ rhc = container_of(rh, struct rcu_head_close,
+ rcu_head);
+ close(rhc->fd);
+ break;
+ case RCUA_CALL:
+ p = (char *)rh - rh->action->u.call.offset;
+ rh->action->u.call.fptr(p);
+ break;
+
+ case RCUA_INVALID:
+ case RCUA_NEXT:
+ case RCUA_END:
+ default:
+ assert(0);
+ }
+}
+
+static void rcu_watchdog(struct rcu_thread *rt)
+{
+#if 0
+ /* future work: print a backtrace for the thread that's holding up
+ * RCU. The only (good) way of doing that is to send a signal to the
+ * other thread, save away the backtrace in the signal handler, and
+ * block here until the signal is done processing.
+ *
+ * Just haven't implemented that yet.
+ */
+ fprintf(stderr, "RCU watchdog %p\n", rt);
+#endif
+}
+
+static void *rcu_main(void *arg)
+{
+ struct rcu_thread *rt;
+ struct rcu_head *rh = NULL;
+ bool end = false;
+ struct timespec maxwait;
+
+ seqlock_val_t rcuval = SEQLOCK_STARTVAL;
+
+ pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
+
+ while (!end) {
+ seqlock_wait(&rcu_seq, rcuval);
+
+ /* RCU watchdog timeout, TODO: configurable value */
+ clock_gettime(CLOCK_MONOTONIC, &maxwait);
+ maxwait.tv_nsec += 100 * 1000 * 1000;
+ if (maxwait.tv_nsec >= 1000000000) {
+ maxwait.tv_sec++;
+ maxwait.tv_nsec -= 1000000000;
+ }
+
+ frr_each (rcu_threads, &rcu_threads, rt)
+ if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
+ rcu_watchdog(rt);
+ seqlock_wait(&rt->rcu, rcuval);
+ }
+
+ while ((rh = rcu_heads_pop(&rcu_heads))) {
+ if (rh->action->type == RCUA_NEXT)
+ break;
+ else if (rh->action->type == RCUA_END)
+ end = true;
+ else
+ rcu_do(rh);
+ }
+
+ rcuval += SEQLOCK_INCR;
+ }
+
+ /* rcu_shutdown can only be called singlethreaded, and it does a
+ * pthread_join, so it should be impossible that anything ended up
+ * on the queue after RCUA_END
+ */
+#if 1
+ assert(!rcu_heads_first(&rcu_heads));
+#else
+ while ((rh = rcu_heads_pop(&rcu_heads)))
+ if (rh->action->type >= RCUA_FREE)
+ rcu_do(rh);
+#endif
+ return NULL;
+}
+
+void rcu_shutdown(void)
+{
+ static struct rcu_head rcu_head_end;
+ struct rcu_thread *rt = rcu_self();
+ void *retval;
+
+ if (!rcu_active)
+ return;
+
+ rcu_assert_read_locked();
+ assert(rcu_threads_count(&rcu_threads) == 1);
+
+ rcu_enqueue(&rcu_head_end, &rcua_end);
+
+ rt->depth = 0;
+ seqlock_release(&rt->rcu);
+ seqlock_release(&rcu_seq);
+ rcu_active = false;
+
+ /* clearing rcu_active is before pthread_join in case we hang in
+ * pthread_join & get a SIGTERM or something - in that case, just
+ * ignore the maybe-still-running RCU thread
+ */
+ if (pthread_join(rcu_pthread, &retval) == 0) {
+ seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
+ seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
+ rt->depth = 1;
+ }
+}
+
+/*
+ * RCU'd free functions
+ */
+
+void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
+{
+ /* refer to rcu_bump() for why we need to hold RCU when adding items
+ * to rcu_heads
+ */
+ rcu_assert_read_locked();
+
+ rh->action = action;
+
+ if (!rcu_active) {
+ rcu_do(rh);
+ return;
+ }
+ rcu_heads_add_tail(&rcu_heads, rh);
+ atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
+ memory_order_relaxed);
+}
+
+void rcu_close(struct rcu_head_close *rhc, int fd)
+{
+ rhc->fd = fd;
+ rcu_enqueue(&rhc->rcu_head, &rcua_close);
+}
diff --git a/lib/frrcu.h b/lib/frrcu.h
new file mode 100644
index 0000000000..8f789303cc
--- /dev/null
+++ b/lib/frrcu.h
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef _FRRCU_H
+#define _FRRCU_H
+
+#include "memory.h"
+#include "atomlist.h"
+#include "seqlock.h"
+
+/* quick RCU primer:
+ * There's a global sequence counter. Whenever a thread does a
+ * rcu_read_lock(), it is marked as holding the current sequence counter.
+ * When something is cleaned with RCU, the global sequence counter is
+ * increased and the item is queued for cleanup - *after* all threads are
+ * at a more recent sequence counter (or no sequence counter / unheld).
+ *
+ * So, by delaying resource cleanup, RCU ensures that things don't go away
+ * while another thread may hold a (stale) reference.
+ *
+ * Note that even if a thread is in rcu_read_lock(), it is invalid for that
+ * thread to access bits after rcu_free() & co on them. This is a design
+ * choice to allow no-op'ing out the entire RCU mechanism if we're running
+ * singlethreaded. (Also allows some optimization on the counter bumping.)
+ *
+ * differences from Linux Kernel RCU:
+ * - there's no rcu_synchronize(), if you really need to defer something
+ * use rcu_call() (and double check it's really necessary)
+ * - rcu_dereference() and rcu_assign_pointer() don't exist, use atomic_*
+ * instead (ATOM* list structures do the right thing)
+ */
+
+/* opaque */
+struct rcu_thread;
+
+/* called before new thread creation, sets up rcu thread info for new thread
+ * before it actually exits. This ensures possible RCU references are held
+ * for thread startup.
+ *
+ * return value must be passed into the new thread's call to rcu_thread_start()
+ */
+extern struct rcu_thread *rcu_thread_prepare(void);
+
+/* cleanup in case pthread_create() fails */
+extern void rcu_thread_unprepare(struct rcu_thread *rcu_thread);
+
+/* called early in the new thread, with the return value from the above.
+ * NB: new thread is initially in RCU-held state! (at depth 1)
+ *
+ * TBD: maybe inherit RCU state from rcu_thread_prepare()?
+ */
+extern void rcu_thread_start(struct rcu_thread *rcu_thread);
+
+/* thread exit is handled through pthread_key_create's destructor function */
+
+/* global RCU shutdown - must be called with only 1 active thread left. waits
+ * until remaining RCU actions are done & RCU thread has exited.
+ *
+ * This is mostly here to get a clean exit without memleaks.
+ */
+extern void rcu_shutdown(void);
+
+/* enter / exit RCU-held state. counter-based, so can be called nested. */
+extern void rcu_read_lock(void);
+extern void rcu_read_unlock(void);
+
+/* for debugging / safety checks */
+extern void rcu_assert_read_locked(void);
+extern void rcu_assert_read_unlocked(void);
+
+enum rcu_action_type {
+ RCUA_INVALID = 0,
+ /* used internally by the RCU code, shouldn't ever show up outside */
+ RCUA_NEXT,
+ RCUA_END,
+ /* normal RCU actions, for outside use */
+ RCUA_FREE,
+ RCUA_CLOSE,
+ RCUA_CALL,
+};
+
+/* since rcu_head is intended to be embedded into structs which may exist
+ * with lots of copies, rcu_head is shrunk down to its absolute minimum -
+ * the atomlist pointer + a pointer to this action struct.
+ */
+struct rcu_action {
+ enum rcu_action_type type;
+
+ union {
+ struct {
+ struct memtype *mt;
+ ptrdiff_t offset;
+ } free;
+
+ struct {
+ void (*fptr)(void *arg);
+ ptrdiff_t offset;
+ } call;
+ } u;
+};
+
+/* RCU cleanup function queue item */
+PREDECL_ATOMLIST(rcu_heads)
+struct rcu_head {
+ struct rcu_heads_item head;
+ const struct rcu_action *action;
+};
+
+/* special RCU head for delayed fd-close */
+struct rcu_head_close {
+ struct rcu_head rcu_head;
+ int fd;
+};
+
+/* enqueue RCU action - use the macros below to get the rcu_action set up */
+extern void rcu_enqueue(struct rcu_head *head, const struct rcu_action *action);
+
+/* RCU free() and file close() operations.
+ *
+ * freed memory / closed fds become _immediately_ unavailable to the calling
+ * thread, but will remain available for other threads until they have passed
+ * into RCU-released state.
+ */
+
+/* may be called with NULL mt to do non-MTYPE free() */
+#define rcu_free(mtype, ptr, field) \
+ do { \
+ typeof(ptr) _ptr = (ptr); \
+ struct rcu_head *_rcu_head = &_ptr->field; \
+ static const struct rcu_action _rcu_action = { \
+ .type = RCUA_FREE, \
+ .u.free = { \
+ .mt = mtype, \
+ .offset = offsetof(typeof(*_ptr), field), \
+ }, \
+ }; \
+ rcu_enqueue(_rcu_head, &_rcu_action); \
+ } while (0)
+
+/* use this sparingly, it runs on (and blocks) the RCU thread */
+#define rcu_call(func, ptr, field) \
+ do { \
+ typeof(ptr) _ptr = (ptr); \
+ void (*fptype)(typeof(ptr)); \
+ struct rcu_head *_rcu_head = &_ptr->field; \
+ static const struct rcu_action _rcu_action = { \
+ .type = RCUA_CALL, \
+ .u.call = { \
+ .fptr = (void *)func, \
+ .offset = offsetof(typeof(*_ptr), field), \
+ }, \
+ }; \
+ (void)(_fptype = func); \
+ rcu_enqueue(_rcu_head, &_rcu_action); \
+ } while (0)
+
+extern void rcu_close(struct rcu_head_close *head, int fd);
+
+#endif /* _FRRCU_H */
diff --git a/lib/libfrr.c b/lib/libfrr.c
index 0fc321d6e0..35c6092140 100644
--- a/lib/libfrr.c
+++ b/lib/libfrr.c
@@ -41,6 +41,7 @@
#include "northbound_cli.h"
#include "northbound_db.h"
#include "debug.h"
+#include "frrcu.h"
DEFINE_HOOK(frr_late_init, (struct thread_master * tm), (tm))
DEFINE_KOOH(frr_early_fini, (), ())
@@ -1081,6 +1082,7 @@ void frr_fini(void)
master = NULL;
closezlog();
/* frrmod_init -> nothing needed / hooks */
+ rcu_shutdown();
if (!debug_memstats_at_exit)
return;
diff --git a/lib/log.c b/lib/log.c
index c577239908..51a0ddd6b7 100644
--- a/lib/log.c
+++ b/lib/log.c
@@ -559,9 +559,7 @@ static void crash_write(struct fbuf *fb, char *msgstart)
void zlog_signal(int signo, const char *action, void *siginfo_v,
void *program_counter)
{
-#ifdef SA_SIGINFO
siginfo_t *siginfo = siginfo_v;
-#endif
time_t now;
char buf[sizeof("DEFAULT: Received signal S at T (si_addr 0xP, PC 0xP); aborting...")
+ 100];
@@ -575,7 +573,6 @@ void zlog_signal(int signo, const char *action, void *siginfo_v,
msgstart = fb.pos;
bprintfrr(&fb, "Received signal %d at %lld", signo, (long long)now);
-#ifdef SA_SIGINFO
if (program_counter)
bprintfrr(&fb, " (si_addr 0x%tx, PC 0x%tx)",
(ptrdiff_t)siginfo->si_addr,
@@ -583,7 +580,6 @@ void zlog_signal(int signo, const char *action, void *siginfo_v,
else
bprintfrr(&fb, " (si_addr 0x%tx)",
(ptrdiff_t)siginfo->si_addr);
-#endif /* SA_SIGINFO */
bprintfrr(&fb, "; %s\n", action);
crash_write(&fb, msgstart);
diff --git a/lib/seqlock.c b/lib/seqlock.c
index 223d14952c..c05ec19db4 100644
--- a/lib/seqlock.c
+++ b/lib/seqlock.c
@@ -25,6 +25,7 @@
#include "config.h"
#endif
+#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <errno.h>
@@ -35,44 +36,75 @@
#include "seqlock.h"
+/****************************************
+ * OS specific synchronization wrappers *
+ ****************************************/
+
+/*
+ * Linux: sys_futex()
+ */
#ifdef HAVE_SYNC_LINUX_FUTEX
-/* Linux-specific - sys_futex() */
#include <sys/syscall.h>
#include <linux/futex.h>
-static long sys_futex(void *addr1, int op, int val1, struct timespec *timeout,
- void *addr2, int val3)
+static long sys_futex(void *addr1, int op, int val1,
+ const struct timespec *timeout, void *addr2, int val3)
{
return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
}
#define wait_once(sqlo, val) \
sys_futex((int *)&sqlo->pos, FUTEX_WAIT, (int)val, NULL, NULL, 0)
+#define wait_time(sqlo, val, time, reltime) \
+ sys_futex((int *)&sqlo->pos, FUTEX_WAIT_BITSET, (int)val, time, \
+ NULL, ~0U)
#define wait_poke(sqlo) \
sys_futex((int *)&sqlo->pos, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)
+/*
+ * OpenBSD: sys_futex(), almost the same as on Linux
+ */
#elif defined(HAVE_SYNC_OPENBSD_FUTEX)
-/* OpenBSD variant of the above. untested, not upstream in OpenBSD. */
#include <sys/syscall.h>
#include <sys/futex.h>
+#define TIME_RELATIVE 1
+
#define wait_once(sqlo, val) \
futex((int *)&sqlo->pos, FUTEX_WAIT, (int)val, NULL, NULL, 0)
+#define wait_time(sqlo, val, time, reltime) \
+ futex((int *)&sqlo->pos, FUTEX_WAIT, (int)val, reltime, NULL, 0)
#define wait_poke(sqlo) \
futex((int *)&sqlo->pos, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)
+/*
+ * FreeBSD: _umtx_op()
+ */
#elif defined(HAVE_SYNC_UMTX_OP)
-/* FreeBSD-specific: umtx_op() */
#include <sys/umtx.h>
#define wait_once(sqlo, val) \
_umtx_op((void *)&sqlo->pos, UMTX_OP_WAIT_UINT, val, NULL, NULL)
+static int wait_time(struct seqlock *sqlo, uint32_t val,
+ const struct timespec *abstime,
+ const struct timespec *reltime)
+{
+ struct _umtx_time t;
+ t._flags = UMTX_ABSTIME;
+ t._clockid = CLOCK_MONOTONIC;
+ memcpy(&t._timeout, abstime, sizeof(t._timeout));
+ return _umtx_op((void *)&sqlo->pos, UMTX_OP_WAIT_UINT, val,
+ (void *)(uintptr_t) sizeof(t), &t);
+}
#define wait_poke(sqlo) \
_umtx_op((void *)&sqlo->pos, UMTX_OP_WAKE, INT_MAX, NULL, NULL)
-#else
-/* generic version. used on *BSD, Solaris and OSX.
+/*
+ * generic version. used on NetBSD, Solaris and OSX. really shitty.
*/
+#else
+
+#define TIME_ABS_REALTIME 1
#define wait_init(sqlo) do { \
pthread_mutex_init(&sqlo->lock, NULL); \
@@ -80,6 +112,9 @@ static long sys_futex(void *addr1, int op, int val1, struct timespec *timeout,
} while (0)
#define wait_prep(sqlo) pthread_mutex_lock(&sqlo->lock)
#define wait_once(sqlo, val) pthread_cond_wait(&sqlo->wake, &sqlo->lock)
+#define wait_time(sqlo, val, time, reltime) \
+ pthread_cond_timedwait(&sqlo->wake, \
+ &sqlo->lock, time);
#define wait_done(sqlo) pthread_mutex_unlock(&sqlo->lock)
#define wait_poke(sqlo) do { \
pthread_mutex_lock(&sqlo->lock); \
@@ -103,18 +138,112 @@ void seqlock_wait(struct seqlock *sqlo, seqlock_val_t val)
seqlock_assert_valid(val);
wait_prep(sqlo);
- while (1) {
- cur = atomic_load_explicit(&sqlo->pos, memory_order_acquire);
- if (!(cur & 1))
+ cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed);
+
+ while (cur & SEQLOCK_HELD) {
+ cal = SEQLOCK_VAL(cur) - val - 1;
+ assert(cal < 0x40000000 || cal > 0xc0000000);
+ if (cal < 0x80000000)
break;
- cal = cur - val - 1;
+
+ if ((cur & SEQLOCK_WAITERS)
+ || atomic_compare_exchange_weak_explicit(
+ &sqlo->pos, &cur, cur | SEQLOCK_WAITERS,
+ memory_order_relaxed, memory_order_relaxed)) {
+ wait_once(sqlo, cur | SEQLOCK_WAITERS);
+ cur = atomic_load_explicit(&sqlo->pos,
+ memory_order_relaxed);
+ }
+ /* else: we failed to swap in cur because it just changed */
+ }
+ wait_done(sqlo);
+}
+
+bool seqlock_timedwait(struct seqlock *sqlo, seqlock_val_t val,
+ const struct timespec *abs_monotime_limit)
+{
+/*
+ * ABS_REALTIME - used on NetBSD, Solaris and OSX
+ */
+#if TIME_ABS_REALTIME
+#define time_arg1 &abs_rt
+#define time_arg2 NULL
+#define time_prep
+ struct timespec curmono, abs_rt;
+
+ clock_gettime(CLOCK_MONOTONIC, &curmono);
+ clock_gettime(CLOCK_REALTIME, &abs_rt);
+
+ abs_rt.tv_nsec += abs_monotime_limit->tv_nsec - curmono.tv_nsec;
+ if (abs_rt.tv_nsec < 0) {
+ abs_rt.tv_sec--;
+ abs_rt.tv_nsec += 1000000000;
+ } else if (abs_rt.tv_nsec >= 1000000000) {
+ abs_rt.tv_sec++;
+ abs_rt.tv_nsec -= 1000000000;
+ }
+ abs_rt.tv_sec += abs_monotime_limit->tv_sec - curmono.tv_sec;
+
+/*
+ * RELATIVE - used on OpenBSD (might get a patch to get absolute monotime)
+ */
+#elif TIME_RELATIVE
+ struct timespec reltime;
+
+#define time_arg1 abs_monotime_limit
+#define time_arg2 &reltime
+#define time_prep \
+ clock_gettime(CLOCK_MONOTONIC, &reltime); \
+ reltime.tv_sec = abs_monotime_limit.tv_sec - reltime.tv_sec; \
+ reltime.tv_nsec = abs_monotime_limit.tv_nsec - reltime.tv_nsec; \
+ if (reltime.tv_nsec < 0) { \
+ reltime.tv_sec--; \
+ reltime.tv_nsec += 1000000000; \
+ }
+/*
+ * FreeBSD & Linux: absolute time re. CLOCK_MONOTONIC
+ */
+#else
+#define time_arg1 abs_monotime_limit
+#define time_arg2 NULL
+#define time_prep
+#endif
+
+ bool ret = true;
+ seqlock_val_t cur, cal;
+
+ seqlock_assert_valid(val);
+
+ wait_prep(sqlo);
+ cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed);
+
+ while (cur & SEQLOCK_HELD) {
+ cal = SEQLOCK_VAL(cur) - val - 1;
assert(cal < 0x40000000 || cal > 0xc0000000);
if (cal < 0x80000000)
break;
- wait_once(sqlo, cur);
+ if ((cur & SEQLOCK_WAITERS)
+ || atomic_compare_exchange_weak_explicit(
+ &sqlo->pos, &cur, cur | SEQLOCK_WAITERS,
+ memory_order_relaxed, memory_order_relaxed)) {
+ int rv;
+
+ time_prep
+
+ rv = wait_time(sqlo, cur | SEQLOCK_WAITERS, time_arg1,
+ time_arg2);
+ if (rv) {
+ ret = false;
+ break;
+ }
+ cur = atomic_load_explicit(&sqlo->pos,
+ memory_order_relaxed);
+ }
}
wait_done(sqlo);
+
+ return ret;
}
bool seqlock_check(struct seqlock *sqlo, seqlock_val_t val)
@@ -123,26 +252,32 @@ bool seqlock_check(struct seqlock *sqlo, seqlock_val_t val)
seqlock_assert_valid(val);
- cur = atomic_load_explicit(&sqlo->pos, memory_order_acquire);
- if (!(cur & 1))
+ cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed);
+ if (!(cur & SEQLOCK_HELD))
return 1;
- cur -= val;
+ cur = SEQLOCK_VAL(cur) - val - 1;
assert(cur < 0x40000000 || cur > 0xc0000000);
return cur < 0x80000000;
}
void seqlock_acquire_val(struct seqlock *sqlo, seqlock_val_t val)
{
+ seqlock_val_t prev;
+
seqlock_assert_valid(val);
- atomic_store_explicit(&sqlo->pos, val, memory_order_release);
- wait_poke(sqlo);
+ prev = atomic_exchange_explicit(&sqlo->pos, val, memory_order_relaxed);
+ if (prev & SEQLOCK_WAITERS)
+ wait_poke(sqlo);
}
void seqlock_release(struct seqlock *sqlo)
{
- atomic_store_explicit(&sqlo->pos, 0, memory_order_release);
- wait_poke(sqlo);
+ seqlock_val_t prev;
+
+ prev = atomic_exchange_explicit(&sqlo->pos, 0, memory_order_relaxed);
+ if (prev & SEQLOCK_WAITERS)
+ wait_poke(sqlo);
}
void seqlock_init(struct seqlock *sqlo)
@@ -154,14 +289,23 @@ void seqlock_init(struct seqlock *sqlo)
seqlock_val_t seqlock_cur(struct seqlock *sqlo)
{
- return atomic_load_explicit(&sqlo->pos, memory_order_acquire);
+ return SEQLOCK_VAL(atomic_load_explicit(&sqlo->pos,
+ memory_order_relaxed));
}
seqlock_val_t seqlock_bump(struct seqlock *sqlo)
{
- seqlock_val_t val;
+ seqlock_val_t val, cur;
+
+ cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed);
+ seqlock_assert_valid(cur);
+
+ do {
+ val = SEQLOCK_VAL(cur) + SEQLOCK_INCR;
+ } while (!atomic_compare_exchange_weak_explicit(&sqlo->pos, &cur, val,
+ memory_order_relaxed, memory_order_relaxed));
- val = atomic_fetch_add_explicit(&sqlo->pos, 2, memory_order_release);
- wait_poke(sqlo);
+ if (cur & SEQLOCK_WAITERS)
+ wait_poke(sqlo);
return val;
}
diff --git a/lib/seqlock.h b/lib/seqlock.h
index eef05a4307..b551e3ffc4 100644
--- a/lib/seqlock.h
+++ b/lib/seqlock.h
@@ -54,12 +54,28 @@
*/
/* use sequentially increasing "ticket numbers". lowest bit will always
- * be 1 to have a 'cleared' indication (i.e., counts 1,3,5,7,etc. )
+ * be 1 to have a 'cleared' indication (i.e., counts 1,5,9,13,etc. )
+ * 2nd lowest bit is used to indicate we have waiters.
*/
typedef _Atomic uint32_t seqlock_ctr_t;
typedef uint32_t seqlock_val_t;
-#define seqlock_assert_valid(val) assert(val & 1)
+#define seqlock_assert_valid(val) assert((val) & SEQLOCK_HELD)
+/* NB: SEQLOCK_WAITERS is only allowed if SEQLOCK_HELD is also set; can't
+ * have waiters on an unheld seqlock
+ */
+#define SEQLOCK_HELD (1U << 0)
+#define SEQLOCK_WAITERS (1U << 1)
+#define SEQLOCK_VAL(n) ((n) & ~SEQLOCK_WAITERS)
+#define SEQLOCK_STARTVAL 1U
+#define SEQLOCK_INCR 4U
+
+/* TODO: originally, this was using "atomic_fetch_add", which is the reason
+ * bit 0 is used to indicate held state. With SEQLOCK_WAITERS added, there's
+ * no fetch_add anymore (cmpxchg loop instead), so we don't need to use bit 0
+ * for this anymore & can just special-case the value 0 for it and skip it in
+ * counting.
+ */
struct seqlock {
/* always used */
@@ -74,8 +90,16 @@ struct seqlock {
extern void seqlock_init(struct seqlock *sqlo);
-/* while (sqlo <= val) - wait until seqlock->pos > val, or seqlock unheld */
+/* basically: "while (sqlo <= val) wait();"
+ * returns when sqlo > val || !seqlock_held(sqlo)
+ */
extern void seqlock_wait(struct seqlock *sqlo, seqlock_val_t val);
+
+/* same, but time-limited (limit is an absolute CLOCK_MONOTONIC value) */
+extern bool seqlock_timedwait(struct seqlock *sqlo, seqlock_val_t val,
+ const struct timespec *abs_monotime_limit);
+
+/* one-shot test, returns true if seqlock_wait would return immediately */
extern bool seqlock_check(struct seqlock *sqlo, seqlock_val_t val);
static inline bool seqlock_held(struct seqlock *sqlo)
@@ -85,12 +109,20 @@ static inline bool seqlock_held(struct seqlock *sqlo)
/* sqlo - get seqlock position -- for the "counter" seqlock */
extern seqlock_val_t seqlock_cur(struct seqlock *sqlo);
-/* sqlo++ - note: like x++, returns previous value, before bumping */
+
+/* ++sqlo (but atomic & wakes waiters) - returns value that we bumped to.
+ *
+ * guarantees:
+ * - each seqlock_bump call bumps the position by exactly one SEQLOCK_INCR.
+ * There are no skipped/missed or multiple increments.
+ * - each return value is only returned from one seqlock_bump() call
+ */
extern seqlock_val_t seqlock_bump(struct seqlock *sqlo);
/* sqlo = val - can be used on held seqlock. */
extern void seqlock_acquire_val(struct seqlock *sqlo, seqlock_val_t val);
+
/* sqlo = ref - standard pattern: acquire relative to other seqlock */
static inline void seqlock_acquire(struct seqlock *sqlo, struct seqlock *ref)
{
diff --git a/lib/sigevent.c b/lib/sigevent.c
index d02b074223..fcd85d0d43 100644
--- a/lib/sigevent.c
+++ b/lib/sigevent.c
@@ -24,7 +24,6 @@
#include <memory.h>
#include <lib_errors.h>
-#ifdef SA_SIGINFO
#ifdef HAVE_UCONTEXT_H
#ifdef GNU_LINUX
/* get REG_EIP from ucontext.h */
@@ -34,7 +33,6 @@
#endif /* GNU_LINUX */
#include <ucontext.h>
#endif /* HAVE_UCONTEXT_H */
-#endif /* SA_SIGINFO */
/* master signals descriptor struct */
@@ -158,8 +156,6 @@ static int signal_set(int signo)
return 0;
}
-#ifdef SA_SIGINFO
-
/* XXX This function should be enhanced to support more platforms
(it currently works only on Linux/x86). */
static void *program_counter(void *context)
@@ -199,41 +195,19 @@ static void *program_counter(void *context)
return NULL;
}
-#endif /* SA_SIGINFO */
-
static void __attribute__((noreturn))
-exit_handler(int signo
-#ifdef SA_SIGINFO
- ,
- siginfo_t *siginfo, void *context
-#endif
- )
+exit_handler(int signo, siginfo_t *siginfo, void *context)
{
-#ifndef SA_SIGINFO
- void *siginfo = NULL;
- void *pc = NULL;
-#else
void *pc = program_counter(context);
-#endif
zlog_signal(signo, "exiting...", siginfo, pc);
_exit(128 + signo);
}
static void __attribute__((noreturn))
-core_handler(int signo
-#ifdef SA_SIGINFO
- ,
- siginfo_t *siginfo, void *context
-#endif
- )
+core_handler(int signo, siginfo_t *siginfo, void *context)
{
-#ifndef SA_SIGINFO
- void *siginfo = NULL;
- void *pc = NULL;
-#else
void *pc = program_counter(context);
-#endif
/* make sure we don't hang in here. default for SIGALRM is terminate.
* - if we're in backtrace for more than a second, abort. */
@@ -290,12 +264,7 @@ static void trap_default_signals(void)
static const struct {
const int *sigs;
unsigned int nsigs;
- void (*handler)(int signo
-#ifdef SA_SIGINFO
- ,
- siginfo_t *info, void *context
-#endif
- );
+ void (*handler)(int signo, siginfo_t *info, void *context);
} sigmap[] = {
{core_signals, array_size(core_signals), core_handler},
{exit_signals, array_size(exit_signals), exit_handler},
@@ -316,15 +285,10 @@ static void trap_default_signals(void)
act.sa_handler = SIG_IGN;
act.sa_flags = 0;
} else {
-#ifdef SA_SIGINFO
/* Request extra arguments to signal
* handler. */
act.sa_sigaction = sigmap[i].handler;
act.sa_flags = SA_SIGINFO;
-#else
- act.sa_handler = sigmap[i].handler;
- act.sa_flags = 0;
-#endif
#ifdef SA_RESETHAND
/* don't try to print backtraces
* recursively */
diff --git a/lib/subdir.am b/lib/subdir.am
index 3754d270a7..2be7537bcc 100644
--- a/lib/subdir.am
+++ b/lib/subdir.am
@@ -21,6 +21,7 @@ lib_libfrr_la_SOURCES = \
lib/distribute.c \
lib/ferr.c \
lib/filter.c \
+ lib/frrcu.c \
lib/frrlua.c \
lib/frr_pthread.c \
lib/frrstr.c \
@@ -160,6 +161,7 @@ pkginclude_HEADERS += \
lib/frrlua.h \
lib/frr_pthread.h \
lib/frratomic.h \
+ lib/frrcu.h \
lib/frrstr.h \
lib/getopt.h \
lib/graph.h \
diff --git a/lib/thread.c b/lib/thread.c
index f862ce5eb0..5756ebc1f9 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -25,6 +25,7 @@
#include "thread.h"
#include "memory.h"
+#include "frrcu.h"
#include "log.h"
#include "hash.h"
#include "command.h"
@@ -737,6 +738,9 @@ static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
< 0) // effect a poll (return immediately)
timeout = 0;
+ rcu_read_unlock();
+ rcu_assert_read_unlocked();
+
/* add poll pipe poker */
assert(count + 1 < pfdsize);
pfds[count].fd = m->io_pipe[0];
@@ -750,6 +754,8 @@ static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
;
+ rcu_read_lock();
+
return num;
}