diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/frr_pthread.c | 15 | ||||
| -rw-r--r-- | lib/frr_pthread.h | 3 | ||||
| -rw-r--r-- | lib/frrcu.c | 527 | ||||
| -rw-r--r-- | lib/frrcu.h | 172 | ||||
| -rw-r--r-- | lib/libfrr.c | 2 | ||||
| -rw-r--r-- | lib/log.c | 4 | ||||
| -rw-r--r-- | lib/seqlock.c | 190 | ||||
| -rw-r--r-- | lib/seqlock.h | 40 | ||||
| -rw-r--r-- | lib/sigevent.c | 42 | ||||
| -rw-r--r-- | lib/subdir.am | 2 | ||||
| -rw-r--r-- | lib/thread.c | 6 |
11 files changed, 931 insertions, 72 deletions
diff --git a/lib/frr_pthread.c b/lib/frr_pthread.c index e588571c01..bdb6c2a397 100644 --- a/lib/frr_pthread.c +++ b/lib/frr_pthread.c @@ -133,18 +133,29 @@ int frr_pthread_set_name(struct frr_pthread *fpt) return ret; } +static void *frr_pthread_inner(void *arg) +{ + struct frr_pthread *fpt = arg; + + rcu_thread_start(fpt->rcu_thread); + return fpt->attr.start(fpt); +} + int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr) { int ret; - ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt); + fpt->rcu_thread = rcu_thread_prepare(); + ret = pthread_create(&fpt->thread, attr, frr_pthread_inner, fpt); /* * Per pthread_create(3), the contents of fpt->thread are undefined if * pthread_create() did not succeed. Reset this value to zero. */ - if (ret < 0) + if (ret < 0) { + rcu_thread_unprepare(fpt->rcu_thread); memset(&fpt->thread, 0x00, sizeof(fpt->thread)); + } return ret; } diff --git a/lib/frr_pthread.h b/lib/frr_pthread.h index 3afe7ba966..6096a50370 100644 --- a/lib/frr_pthread.h +++ b/lib/frr_pthread.h @@ -23,6 +23,7 @@ #include <pthread.h> #include "frratomic.h" #include "memory.h" +#include "frrcu.h" #include "thread.h" #ifdef __cplusplus @@ -50,6 +51,8 @@ struct frr_pthread { /* pthread id */ pthread_t thread; + struct rcu_thread *rcu_thread; + /* thread master for this pthread's thread.c event loop */ struct thread_master *master; diff --git a/lib/frrcu.c b/lib/frrcu.c new file mode 100644 index 0000000000..7e6475b648 --- /dev/null +++ b/lib/frrcu.c @@ -0,0 +1,527 @@ +/* + * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* implementation notes: this is an epoch-based RCU implementation. rcu_seq + * (global variable) counts the current epoch. Threads hold a specific epoch + * in rcu_read_lock(). This is the oldest epoch a thread might be accessing + * data from. + * + * The rcu_seq global is only pushed forward on rcu_read_lock() and + * rcu_read_unlock() calls. This makes things a tad more efficient since + * those are the only places it matters: + * - on rcu_read_lock, we don't want to hold an old epoch pointlessly + * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch + * when heading into a long idle period where no thread holds RCU + * + * rcu_thread structures themselves are RCU-free'd. + * + * rcu_head structures are the most iffy; normally for an ATOMLIST we would + * need to make sure we use rcu_free or pthread_rwlock to deallocate old items + * to prevent ABA or use-after-free problems. However, our ATOMLIST code + * guarantees that if the list remains non-empty in all cases, we only need + * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF + * issues - but we do need to keep at least 1 item on the list. + * + * (Search the atomlist code for all uses of "last") + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <pthread.h> +#ifdef HAVE_PTHREAD_NP_H +#include <pthread_np.h> +#endif +#include <string.h> +#include <unistd.h> +#include <signal.h> + +#include "frrcu.h" +#include "seqlock.h" +#include "atomlist.h" + +DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread") +DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier") + +DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head) + +PREDECL_ATOMLIST(rcu_threads) +struct rcu_thread { + struct rcu_threads_item head; + + struct rcu_head rcu_head; + + struct seqlock rcu; + + /* only accessed by thread itself, not atomic */ + unsigned depth; +}; +DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head) + +static const struct rcu_action rcua_next = { .type = RCUA_NEXT }; +static const struct rcu_action rcua_end = { .type = RCUA_END }; +static const struct rcu_action rcua_close = { .type = RCUA_CLOSE }; + +struct rcu_next { + struct rcu_head head_free; + struct rcu_head head_next; +}; + +#define rcu_free_internal(mtype, ptr, field) \ + do { \ + typeof(ptr) _ptr = (ptr); \ + struct rcu_head *_rcu_head = &_ptr->field; \ + static const struct rcu_action _rcu_action = { \ + .type = RCUA_FREE, \ + .u.free = { \ + .mt = mtype, \ + .offset = offsetof(typeof(*_ptr), field), \ + }, \ + }; \ + _rcu_head->action = &_rcu_action; \ + rcu_heads_add_tail(&rcu_heads, _rcu_head); \ + } while (0) + +/* primary global RCU position */ +static struct seqlock rcu_seq; +/* this is set to rcu_seq whenever something is added on the RCU queue. + * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step. + */ +static _Atomic seqlock_val_t rcu_dirty; + +static struct rcu_threads_head rcu_threads; +static struct rcu_heads_head rcu_heads; + +/* main thread & RCU sweeper have pre-setup rcu_thread structures. The + * reasons are different: + * + * - rcu_thread_main is there because the main thread isn't started like + * other threads, it's implicitly created when the program is started. So + * rcu_thread_main matches up implicitly. + * + * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no + * sense really), it only exists so we can call RCU-using functions from + * the RCU thread without special handling in rcu_read_lock/unlock. + */ +static struct rcu_thread rcu_thread_main; +static struct rcu_thread rcu_thread_rcu; + +static pthread_t rcu_pthread; +static pthread_key_t rcu_thread_key; +static bool rcu_active; + +static void rcu_start(void); +static void rcu_bump(void); + +/* + * preinitialization for main thread + */ +static void rcu_thread_end(void *rcu_thread); + +static void rcu_preinit(void) __attribute__((constructor)); +static void rcu_preinit(void) +{ + struct rcu_thread *rt; + + rt = &rcu_thread_main; + rt->depth = 1; + seqlock_init(&rt->rcu); + seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL); + + pthread_key_create(&rcu_thread_key, rcu_thread_end); + pthread_setspecific(rcu_thread_key, rt); + + rcu_threads_add_tail(&rcu_threads, rt); + + /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */ + rt = &rcu_thread_rcu; + rt->depth = 1; + + seqlock_init(&rcu_seq); + seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL); +} + +static struct rcu_thread *rcu_self(void) +{ + return (struct rcu_thread *)pthread_getspecific(rcu_thread_key); +} + +/* + * thread management (for the non-main thread) + */ +struct rcu_thread *rcu_thread_prepare(void) +{ + struct rcu_thread *rt, *cur; + + rcu_assert_read_locked(); + + if (!rcu_active) + rcu_start(); + + cur = rcu_self(); + assert(cur->depth); + + /* new thread always starts with rcu_read_lock held at depth 1, and + * holding the same epoch as the parent (this makes it possible to + * use RCU for things passed into the thread through its arg) + */ + rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt)); + rt->depth = 1; + + seqlock_init(&rt->rcu); + seqlock_acquire(&rt->rcu, &cur->rcu); + + rcu_threads_add_tail(&rcu_threads, rt); + + return rt; +} + +void rcu_thread_start(struct rcu_thread *rt) +{ + pthread_setspecific(rcu_thread_key, rt); +} + +void rcu_thread_unprepare(struct rcu_thread *rt) +{ + if (rt == &rcu_thread_rcu) + return; + + rt->depth = 1; + seqlock_acquire(&rt->rcu, &rcu_seq); + + rcu_bump(); + if (rt != &rcu_thread_main) + /* this free() happens after seqlock_release() below */ + rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head); + + rcu_threads_del(&rcu_threads, rt); + seqlock_release(&rt->rcu); +} + +static void rcu_thread_end(void *rtvoid) +{ + struct rcu_thread *rt = rtvoid; + rcu_thread_unprepare(rt); +} + +/* + * main RCU control aspects + */ + +static void rcu_bump(void) +{ + struct rcu_next *rn; + + rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn)); + + /* note: each RCUA_NEXT item corresponds to exactly one seqno bump. + * This means we don't need to communicate which seqno is which + * RCUA_NEXT, since we really don't care. + */ + + /* + * Important race condition: while rcu_heads_add_tail is executing, + * there is an intermediate point where the rcu_heads "last" pointer + * already points to rn->head_next, but rn->head_next isn't added to + * the list yet. That means any other "add_tail" calls append to this + * item, which isn't fully on the list yet. Freeze this thread at + * that point and look at another thread doing a rcu_bump. It adds + * these two items and then does a seqlock_bump. But the rcu_heads + * list is still "interrupted" and there's no RCUA_NEXT on the list + * yet (from either the frozen thread or the second thread). So + * rcu_main() might actually hit the end of the list at the + * "interrupt". + * + * This situation is prevented by requiring that rcu_read_lock is held + * for any calls to rcu_bump, since if we're holding the current RCU + * epoch, that means rcu_main can't be chewing on rcu_heads and hit + * that interruption point. Only by the time the thread has continued + * to rcu_read_unlock() - and therefore completed the add_tail - the + * RCU sweeper gobbles up the epoch and can be sure to find at least + * the RCUA_NEXT and RCUA_FREE items on rcu_heads. + */ + rn->head_next.action = &rcua_next; + rcu_heads_add_tail(&rcu_heads, &rn->head_next); + + /* free rn that we allocated above. + * + * This is INTENTIONALLY not built into the RCUA_NEXT action. This + * ensures that after the action above is popped off the queue, there + * is still at least 1 item on the RCU queue. This means we never + * delete the last item, which is extremely important since it keeps + * the atomlist ->last pointer alive and well. + * + * If we were to "run dry" on the RCU queue, add_tail may run into the + * "last item is being deleted - start over" case, and then we may end + * up accessing old RCU queue items that are already free'd. + */ + rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free); + + /* Only allow the RCU sweeper to run after these 2 items are queued. + * + * If another thread enqueues some RCU action in the intermediate + * window here, nothing bad happens - the queued action is associated + * with a larger seq# than strictly necessary. Thus, it might get + * executed a bit later, but that's not a problem. + * + * If another thread acquires the read lock in this window, it holds + * the previous epoch, but its RCU queue actions will be in the next + * epoch. This isn't a problem either, just a tad inefficient. + */ + seqlock_bump(&rcu_seq); +} + +static void rcu_bump_maybe(void) +{ + seqlock_val_t dirty; + + dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed); + /* no problem if we race here and multiple threads bump rcu_seq; + * bumping too much causes no issues while not bumping enough will + * result in delayed cleanup + */ + if (dirty == seqlock_cur(&rcu_seq)) + rcu_bump(); +} + +void rcu_read_lock(void) +{ + struct rcu_thread *rt = rcu_self(); + + assert(rt); + if (rt->depth++ > 0) + return; + + seqlock_acquire(&rt->rcu, &rcu_seq); + /* need to hold RCU for bump ... */ + rcu_bump_maybe(); + /* ... but no point in holding the old epoch if we just bumped */ + seqlock_acquire(&rt->rcu, &rcu_seq); +} + +void rcu_read_unlock(void) +{ + struct rcu_thread *rt = rcu_self(); + + assert(rt && rt->depth); + if (--rt->depth > 0) + return; + rcu_bump_maybe(); + seqlock_release(&rt->rcu); +} + +void rcu_assert_read_locked(void) +{ + struct rcu_thread *rt = rcu_self(); + assert(rt && rt->depth && seqlock_held(&rt->rcu)); +} + +void rcu_assert_read_unlocked(void) +{ + struct rcu_thread *rt = rcu_self(); + assert(rt && !rt->depth && !seqlock_held(&rt->rcu)); +} + +/* + * RCU resource-release thread + */ + +static void *rcu_main(void *arg); + +static void rcu_start(void) +{ + /* ensure we never handle signals on the RCU thread by blocking + * everything here (new thread inherits signal mask) + */ + sigset_t oldsigs, blocksigs; + + sigfillset(&blocksigs); + pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs); + + rcu_active = true; + + assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL)); + + pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); + +#ifdef HAVE_PTHREAD_SETNAME_NP +# ifdef GNU_LINUX + pthread_setname_np(rcu_pthread, "RCU sweeper"); +# elif defined(__NetBSD__) + pthread_setname_np(rcu_pthread, "RCU sweeper", NULL); +# endif +#elif defined(HAVE_PTHREAD_SET_NAME_NP) + pthread_set_name_np(rcu_pthread, "RCU sweeper"); +#endif +} + +static void rcu_do(struct rcu_head *rh) +{ + struct rcu_head_close *rhc; + void *p; + + switch (rh->action->type) { + case RCUA_FREE: + p = (char *)rh - rh->action->u.free.offset; + if (rh->action->u.free.mt) + qfree(rh->action->u.free.mt, p); + else + free(p); + break; + case RCUA_CLOSE: + rhc = container_of(rh, struct rcu_head_close, + rcu_head); + close(rhc->fd); + break; + case RCUA_CALL: + p = (char *)rh - rh->action->u.call.offset; + rh->action->u.call.fptr(p); + break; + + case RCUA_INVALID: + case RCUA_NEXT: + case RCUA_END: + default: + assert(0); + } +} + +static void rcu_watchdog(struct rcu_thread *rt) +{ +#if 0 + /* future work: print a backtrace for the thread that's holding up + * RCU. The only (good) way of doing that is to send a signal to the + * other thread, save away the backtrace in the signal handler, and + * block here until the signal is done processing. + * + * Just haven't implemented that yet. + */ + fprintf(stderr, "RCU watchdog %p\n", rt); +#endif +} + +static void *rcu_main(void *arg) +{ + struct rcu_thread *rt; + struct rcu_head *rh = NULL; + bool end = false; + struct timespec maxwait; + + seqlock_val_t rcuval = SEQLOCK_STARTVAL; + + pthread_setspecific(rcu_thread_key, &rcu_thread_rcu); + + while (!end) { + seqlock_wait(&rcu_seq, rcuval); + + /* RCU watchdog timeout, TODO: configurable value */ + clock_gettime(CLOCK_MONOTONIC, &maxwait); + maxwait.tv_nsec += 100 * 1000 * 1000; + if (maxwait.tv_nsec >= 1000000000) { + maxwait.tv_sec++; + maxwait.tv_nsec -= 1000000000; + } + + frr_each (rcu_threads, &rcu_threads, rt) + if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) { + rcu_watchdog(rt); + seqlock_wait(&rt->rcu, rcuval); + } + + while ((rh = rcu_heads_pop(&rcu_heads))) { + if (rh->action->type == RCUA_NEXT) + break; + else if (rh->action->type == RCUA_END) + end = true; + else + rcu_do(rh); + } + + rcuval += SEQLOCK_INCR; + } + + /* rcu_shutdown can only be called singlethreaded, and it does a + * pthread_join, so it should be impossible that anything ended up + * on the queue after RCUA_END + */ +#if 1 + assert(!rcu_heads_first(&rcu_heads)); +#else + while ((rh = rcu_heads_pop(&rcu_heads))) + if (rh->action->type >= RCUA_FREE) + rcu_do(rh); +#endif + return NULL; +} + +void rcu_shutdown(void) +{ + static struct rcu_head rcu_head_end; + struct rcu_thread *rt = rcu_self(); + void *retval; + + if (!rcu_active) + return; + + rcu_assert_read_locked(); + assert(rcu_threads_count(&rcu_threads) == 1); + + rcu_enqueue(&rcu_head_end, &rcua_end); + + rt->depth = 0; + seqlock_release(&rt->rcu); + seqlock_release(&rcu_seq); + rcu_active = false; + + /* clearing rcu_active is before pthread_join in case we hang in + * pthread_join & get a SIGTERM or something - in that case, just + * ignore the maybe-still-running RCU thread + */ + if (pthread_join(rcu_pthread, &retval) == 0) { + seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL); + seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL); + rt->depth = 1; + } +} + +/* + * RCU'd free functions + */ + +void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action) +{ + /* refer to rcu_bump() for why we need to hold RCU when adding items + * to rcu_heads + */ + rcu_assert_read_locked(); + + rh->action = action; + + if (!rcu_active) { + rcu_do(rh); + return; + } + rcu_heads_add_tail(&rcu_heads, rh); + atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq), + memory_order_relaxed); +} + +void rcu_close(struct rcu_head_close *rhc, int fd) +{ + rhc->fd = fd; + rcu_enqueue(&rhc->rcu_head, &rcua_close); +} diff --git a/lib/frrcu.h b/lib/frrcu.h new file mode 100644 index 0000000000..8f789303cc --- /dev/null +++ b/lib/frrcu.h @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef _FRRCU_H +#define _FRRCU_H + +#include "memory.h" +#include "atomlist.h" +#include "seqlock.h" + +/* quick RCU primer: + * There's a global sequence counter. Whenever a thread does a + * rcu_read_lock(), it is marked as holding the current sequence counter. + * When something is cleaned with RCU, the global sequence counter is + * increased and the item is queued for cleanup - *after* all threads are + * at a more recent sequence counter (or no sequence counter / unheld). + * + * So, by delaying resource cleanup, RCU ensures that things don't go away + * while another thread may hold a (stale) reference. + * + * Note that even if a thread is in rcu_read_lock(), it is invalid for that + * thread to access bits after rcu_free() & co on them. This is a design + * choice to allow no-op'ing out the entire RCU mechanism if we're running + * singlethreaded. (Also allows some optimization on the counter bumping.) + * + * differences from Linux Kernel RCU: + * - there's no rcu_synchronize(), if you really need to defer something + * use rcu_call() (and double check it's really necessary) + * - rcu_dereference() and rcu_assign_pointer() don't exist, use atomic_* + * instead (ATOM* list structures do the right thing) + */ + +/* opaque */ +struct rcu_thread; + +/* called before new thread creation, sets up rcu thread info for new thread + * before it actually exits. This ensures possible RCU references are held + * for thread startup. + * + * return value must be passed into the new thread's call to rcu_thread_start() + */ +extern struct rcu_thread *rcu_thread_prepare(void); + +/* cleanup in case pthread_create() fails */ +extern void rcu_thread_unprepare(struct rcu_thread *rcu_thread); + +/* called early in the new thread, with the return value from the above. + * NB: new thread is initially in RCU-held state! (at depth 1) + * + * TBD: maybe inherit RCU state from rcu_thread_prepare()? + */ +extern void rcu_thread_start(struct rcu_thread *rcu_thread); + +/* thread exit is handled through pthread_key_create's destructor function */ + +/* global RCU shutdown - must be called with only 1 active thread left. waits + * until remaining RCU actions are done & RCU thread has exited. + * + * This is mostly here to get a clean exit without memleaks. + */ +extern void rcu_shutdown(void); + +/* enter / exit RCU-held state. counter-based, so can be called nested. */ +extern void rcu_read_lock(void); +extern void rcu_read_unlock(void); + +/* for debugging / safety checks */ +extern void rcu_assert_read_locked(void); +extern void rcu_assert_read_unlocked(void); + +enum rcu_action_type { + RCUA_INVALID = 0, + /* used internally by the RCU code, shouldn't ever show up outside */ + RCUA_NEXT, + RCUA_END, + /* normal RCU actions, for outside use */ + RCUA_FREE, + RCUA_CLOSE, + RCUA_CALL, +}; + +/* since rcu_head is intended to be embedded into structs which may exist + * with lots of copies, rcu_head is shrunk down to its absolute minimum - + * the atomlist pointer + a pointer to this action struct. + */ +struct rcu_action { + enum rcu_action_type type; + + union { + struct { + struct memtype *mt; + ptrdiff_t offset; + } free; + + struct { + void (*fptr)(void *arg); + ptrdiff_t offset; + } call; + } u; +}; + +/* RCU cleanup function queue item */ +PREDECL_ATOMLIST(rcu_heads) +struct rcu_head { + struct rcu_heads_item head; + const struct rcu_action *action; +}; + +/* special RCU head for delayed fd-close */ +struct rcu_head_close { + struct rcu_head rcu_head; + int fd; +}; + +/* enqueue RCU action - use the macros below to get the rcu_action set up */ +extern void rcu_enqueue(struct rcu_head *head, const struct rcu_action *action); + +/* RCU free() and file close() operations. + * + * freed memory / closed fds become _immediately_ unavailable to the calling + * thread, but will remain available for other threads until they have passed + * into RCU-released state. + */ + +/* may be called with NULL mt to do non-MTYPE free() */ +#define rcu_free(mtype, ptr, field) \ + do { \ + typeof(ptr) _ptr = (ptr); \ + struct rcu_head *_rcu_head = &_ptr->field; \ + static const struct rcu_action _rcu_action = { \ + .type = RCUA_FREE, \ + .u.free = { \ + .mt = mtype, \ + .offset = offsetof(typeof(*_ptr), field), \ + }, \ + }; \ + rcu_enqueue(_rcu_head, &_rcu_action); \ + } while (0) + +/* use this sparingly, it runs on (and blocks) the RCU thread */ +#define rcu_call(func, ptr, field) \ + do { \ + typeof(ptr) _ptr = (ptr); \ + void (*fptype)(typeof(ptr)); \ + struct rcu_head *_rcu_head = &_ptr->field; \ + static const struct rcu_action _rcu_action = { \ + .type = RCUA_CALL, \ + .u.call = { \ + .fptr = (void *)func, \ + .offset = offsetof(typeof(*_ptr), field), \ + }, \ + }; \ + (void)(_fptype = func); \ + rcu_enqueue(_rcu_head, &_rcu_action); \ + } while (0) + +extern void rcu_close(struct rcu_head_close *head, int fd); + +#endif /* _FRRCU_H */ diff --git a/lib/libfrr.c b/lib/libfrr.c index 0fc321d6e0..35c6092140 100644 --- a/lib/libfrr.c +++ b/lib/libfrr.c @@ -41,6 +41,7 @@ #include "northbound_cli.h" #include "northbound_db.h" #include "debug.h" +#include "frrcu.h" DEFINE_HOOK(frr_late_init, (struct thread_master * tm), (tm)) DEFINE_KOOH(frr_early_fini, (), ()) @@ -1081,6 +1082,7 @@ void frr_fini(void) master = NULL; closezlog(); /* frrmod_init -> nothing needed / hooks */ + rcu_shutdown(); if (!debug_memstats_at_exit) return; @@ -559,9 +559,7 @@ static void crash_write(struct fbuf *fb, char *msgstart) void zlog_signal(int signo, const char *action, void *siginfo_v, void *program_counter) { -#ifdef SA_SIGINFO siginfo_t *siginfo = siginfo_v; -#endif time_t now; char buf[sizeof("DEFAULT: Received signal S at T (si_addr 0xP, PC 0xP); aborting...") + 100]; @@ -575,7 +573,6 @@ void zlog_signal(int signo, const char *action, void *siginfo_v, msgstart = fb.pos; bprintfrr(&fb, "Received signal %d at %lld", signo, (long long)now); -#ifdef SA_SIGINFO if (program_counter) bprintfrr(&fb, " (si_addr 0x%tx, PC 0x%tx)", (ptrdiff_t)siginfo->si_addr, @@ -583,7 +580,6 @@ void zlog_signal(int signo, const char *action, void *siginfo_v, else bprintfrr(&fb, " (si_addr 0x%tx)", (ptrdiff_t)siginfo->si_addr); -#endif /* SA_SIGINFO */ bprintfrr(&fb, "; %s\n", action); crash_write(&fb, msgstart); diff --git a/lib/seqlock.c b/lib/seqlock.c index 223d14952c..c05ec19db4 100644 --- a/lib/seqlock.c +++ b/lib/seqlock.c @@ -25,6 +25,7 @@ #include "config.h" #endif +#include <string.h> #include <unistd.h> #include <limits.h> #include <errno.h> @@ -35,44 +36,75 @@ #include "seqlock.h" +/**************************************** + * OS specific synchronization wrappers * + ****************************************/ + +/* + * Linux: sys_futex() + */ #ifdef HAVE_SYNC_LINUX_FUTEX -/* Linux-specific - sys_futex() */ #include <sys/syscall.h> #include <linux/futex.h> -static long sys_futex(void *addr1, int op, int val1, struct timespec *timeout, - void *addr2, int val3) +static long sys_futex(void *addr1, int op, int val1, + const struct timespec *timeout, void *addr2, int val3) { return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); } #define wait_once(sqlo, val) \ sys_futex((int *)&sqlo->pos, FUTEX_WAIT, (int)val, NULL, NULL, 0) +#define wait_time(sqlo, val, time, reltime) \ + sys_futex((int *)&sqlo->pos, FUTEX_WAIT_BITSET, (int)val, time, \ + NULL, ~0U) #define wait_poke(sqlo) \ sys_futex((int *)&sqlo->pos, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) +/* + * OpenBSD: sys_futex(), almost the same as on Linux + */ #elif defined(HAVE_SYNC_OPENBSD_FUTEX) -/* OpenBSD variant of the above. untested, not upstream in OpenBSD. */ #include <sys/syscall.h> #include <sys/futex.h> +#define TIME_RELATIVE 1 + #define wait_once(sqlo, val) \ futex((int *)&sqlo->pos, FUTEX_WAIT, (int)val, NULL, NULL, 0) +#define wait_time(sqlo, val, time, reltime) \ + futex((int *)&sqlo->pos, FUTEX_WAIT, (int)val, reltime, NULL, 0) #define wait_poke(sqlo) \ futex((int *)&sqlo->pos, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) +/* + * FreeBSD: _umtx_op() + */ #elif defined(HAVE_SYNC_UMTX_OP) -/* FreeBSD-specific: umtx_op() */ #include <sys/umtx.h> #define wait_once(sqlo, val) \ _umtx_op((void *)&sqlo->pos, UMTX_OP_WAIT_UINT, val, NULL, NULL) +static int wait_time(struct seqlock *sqlo, uint32_t val, + const struct timespec *abstime, + const struct timespec *reltime) +{ + struct _umtx_time t; + t._flags = UMTX_ABSTIME; + t._clockid = CLOCK_MONOTONIC; + memcpy(&t._timeout, abstime, sizeof(t._timeout)); + return _umtx_op((void *)&sqlo->pos, UMTX_OP_WAIT_UINT, val, + (void *)(uintptr_t) sizeof(t), &t); +} #define wait_poke(sqlo) \ _umtx_op((void *)&sqlo->pos, UMTX_OP_WAKE, INT_MAX, NULL, NULL) -#else -/* generic version. used on *BSD, Solaris and OSX. +/* + * generic version. used on NetBSD, Solaris and OSX. really shitty. */ +#else + +#define TIME_ABS_REALTIME 1 #define wait_init(sqlo) do { \ pthread_mutex_init(&sqlo->lock, NULL); \ @@ -80,6 +112,9 @@ static long sys_futex(void *addr1, int op, int val1, struct timespec *timeout, } while (0) #define wait_prep(sqlo) pthread_mutex_lock(&sqlo->lock) #define wait_once(sqlo, val) pthread_cond_wait(&sqlo->wake, &sqlo->lock) +#define wait_time(sqlo, val, time, reltime) \ + pthread_cond_timedwait(&sqlo->wake, \ + &sqlo->lock, time); #define wait_done(sqlo) pthread_mutex_unlock(&sqlo->lock) #define wait_poke(sqlo) do { \ pthread_mutex_lock(&sqlo->lock); \ @@ -103,18 +138,112 @@ void seqlock_wait(struct seqlock *sqlo, seqlock_val_t val) seqlock_assert_valid(val); wait_prep(sqlo); - while (1) { - cur = atomic_load_explicit(&sqlo->pos, memory_order_acquire); - if (!(cur & 1)) + cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed); + + while (cur & SEQLOCK_HELD) { + cal = SEQLOCK_VAL(cur) - val - 1; + assert(cal < 0x40000000 || cal > 0xc0000000); + if (cal < 0x80000000) break; - cal = cur - val - 1; + + if ((cur & SEQLOCK_WAITERS) + || atomic_compare_exchange_weak_explicit( + &sqlo->pos, &cur, cur | SEQLOCK_WAITERS, + memory_order_relaxed, memory_order_relaxed)) { + wait_once(sqlo, cur | SEQLOCK_WAITERS); + cur = atomic_load_explicit(&sqlo->pos, + memory_order_relaxed); + } + /* else: we failed to swap in cur because it just changed */ + } + wait_done(sqlo); +} + +bool seqlock_timedwait(struct seqlock *sqlo, seqlock_val_t val, + const struct timespec *abs_monotime_limit) +{ +/* + * ABS_REALTIME - used on NetBSD, Solaris and OSX + */ +#if TIME_ABS_REALTIME +#define time_arg1 &abs_rt +#define time_arg2 NULL +#define time_prep + struct timespec curmono, abs_rt; + + clock_gettime(CLOCK_MONOTONIC, &curmono); + clock_gettime(CLOCK_REALTIME, &abs_rt); + + abs_rt.tv_nsec += abs_monotime_limit->tv_nsec - curmono.tv_nsec; + if (abs_rt.tv_nsec < 0) { + abs_rt.tv_sec--; + abs_rt.tv_nsec += 1000000000; + } else if (abs_rt.tv_nsec >= 1000000000) { + abs_rt.tv_sec++; + abs_rt.tv_nsec -= 1000000000; + } + abs_rt.tv_sec += abs_monotime_limit->tv_sec - curmono.tv_sec; + +/* + * RELATIVE - used on OpenBSD (might get a patch to get absolute monotime) + */ +#elif TIME_RELATIVE + struct timespec reltime; + +#define time_arg1 abs_monotime_limit +#define time_arg2 &reltime +#define time_prep \ + clock_gettime(CLOCK_MONOTONIC, &reltime); \ + reltime.tv_sec = abs_monotime_limit.tv_sec - reltime.tv_sec; \ + reltime.tv_nsec = abs_monotime_limit.tv_nsec - reltime.tv_nsec; \ + if (reltime.tv_nsec < 0) { \ + reltime.tv_sec--; \ + reltime.tv_nsec += 1000000000; \ + } +/* + * FreeBSD & Linux: absolute time re. CLOCK_MONOTONIC + */ +#else +#define time_arg1 abs_monotime_limit +#define time_arg2 NULL +#define time_prep +#endif + + bool ret = true; + seqlock_val_t cur, cal; + + seqlock_assert_valid(val); + + wait_prep(sqlo); + cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed); + + while (cur & SEQLOCK_HELD) { + cal = SEQLOCK_VAL(cur) - val - 1; assert(cal < 0x40000000 || cal > 0xc0000000); if (cal < 0x80000000) break; - wait_once(sqlo, cur); + if ((cur & SEQLOCK_WAITERS) + || atomic_compare_exchange_weak_explicit( + &sqlo->pos, &cur, cur | SEQLOCK_WAITERS, + memory_order_relaxed, memory_order_relaxed)) { + int rv; + + time_prep + + rv = wait_time(sqlo, cur | SEQLOCK_WAITERS, time_arg1, + time_arg2); + if (rv) { + ret = false; + break; + } + cur = atomic_load_explicit(&sqlo->pos, + memory_order_relaxed); + } } wait_done(sqlo); + + return ret; } bool seqlock_check(struct seqlock *sqlo, seqlock_val_t val) @@ -123,26 +252,32 @@ bool seqlock_check(struct seqlock *sqlo, seqlock_val_t val) seqlock_assert_valid(val); - cur = atomic_load_explicit(&sqlo->pos, memory_order_acquire); - if (!(cur & 1)) + cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed); + if (!(cur & SEQLOCK_HELD)) return 1; - cur -= val; + cur = SEQLOCK_VAL(cur) - val - 1; assert(cur < 0x40000000 || cur > 0xc0000000); return cur < 0x80000000; } void seqlock_acquire_val(struct seqlock *sqlo, seqlock_val_t val) { + seqlock_val_t prev; + seqlock_assert_valid(val); - atomic_store_explicit(&sqlo->pos, val, memory_order_release); - wait_poke(sqlo); + prev = atomic_exchange_explicit(&sqlo->pos, val, memory_order_relaxed); + if (prev & SEQLOCK_WAITERS) + wait_poke(sqlo); } void seqlock_release(struct seqlock *sqlo) { - atomic_store_explicit(&sqlo->pos, 0, memory_order_release); - wait_poke(sqlo); + seqlock_val_t prev; + + prev = atomic_exchange_explicit(&sqlo->pos, 0, memory_order_relaxed); + if (prev & SEQLOCK_WAITERS) + wait_poke(sqlo); } void seqlock_init(struct seqlock *sqlo) @@ -154,14 +289,23 @@ void seqlock_init(struct seqlock *sqlo) seqlock_val_t seqlock_cur(struct seqlock *sqlo) { - return atomic_load_explicit(&sqlo->pos, memory_order_acquire); + return SEQLOCK_VAL(atomic_load_explicit(&sqlo->pos, + memory_order_relaxed)); } seqlock_val_t seqlock_bump(struct seqlock *sqlo) { - seqlock_val_t val; + seqlock_val_t val, cur; + + cur = atomic_load_explicit(&sqlo->pos, memory_order_relaxed); + seqlock_assert_valid(cur); + + do { + val = SEQLOCK_VAL(cur) + SEQLOCK_INCR; + } while (!atomic_compare_exchange_weak_explicit(&sqlo->pos, &cur, val, + memory_order_relaxed, memory_order_relaxed)); - val = atomic_fetch_add_explicit(&sqlo->pos, 2, memory_order_release); - wait_poke(sqlo); + if (cur & SEQLOCK_WAITERS) + wait_poke(sqlo); return val; } diff --git a/lib/seqlock.h b/lib/seqlock.h index eef05a4307..b551e3ffc4 100644 --- a/lib/seqlock.h +++ b/lib/seqlock.h @@ -54,12 +54,28 @@ */ /* use sequentially increasing "ticket numbers". lowest bit will always - * be 1 to have a 'cleared' indication (i.e., counts 1,3,5,7,etc. ) + * be 1 to have a 'cleared' indication (i.e., counts 1,5,9,13,etc. ) + * 2nd lowest bit is used to indicate we have waiters. */ typedef _Atomic uint32_t seqlock_ctr_t; typedef uint32_t seqlock_val_t; -#define seqlock_assert_valid(val) assert(val & 1) +#define seqlock_assert_valid(val) assert((val) & SEQLOCK_HELD) +/* NB: SEQLOCK_WAITERS is only allowed if SEQLOCK_HELD is also set; can't + * have waiters on an unheld seqlock + */ +#define SEQLOCK_HELD (1U << 0) +#define SEQLOCK_WAITERS (1U << 1) +#define SEQLOCK_VAL(n) ((n) & ~SEQLOCK_WAITERS) +#define SEQLOCK_STARTVAL 1U +#define SEQLOCK_INCR 4U + +/* TODO: originally, this was using "atomic_fetch_add", which is the reason + * bit 0 is used to indicate held state. With SEQLOCK_WAITERS added, there's + * no fetch_add anymore (cmpxchg loop instead), so we don't need to use bit 0 + * for this anymore & can just special-case the value 0 for it and skip it in + * counting. + */ struct seqlock { /* always used */ @@ -74,8 +90,16 @@ struct seqlock { extern void seqlock_init(struct seqlock *sqlo); -/* while (sqlo <= val) - wait until seqlock->pos > val, or seqlock unheld */ +/* basically: "while (sqlo <= val) wait();" + * returns when sqlo > val || !seqlock_held(sqlo) + */ extern void seqlock_wait(struct seqlock *sqlo, seqlock_val_t val); + +/* same, but time-limited (limit is an absolute CLOCK_MONOTONIC value) */ +extern bool seqlock_timedwait(struct seqlock *sqlo, seqlock_val_t val, + const struct timespec *abs_monotime_limit); + +/* one-shot test, returns true if seqlock_wait would return immediately */ extern bool seqlock_check(struct seqlock *sqlo, seqlock_val_t val); static inline bool seqlock_held(struct seqlock *sqlo) @@ -85,12 +109,20 @@ static inline bool seqlock_held(struct seqlock *sqlo) /* sqlo - get seqlock position -- for the "counter" seqlock */ extern seqlock_val_t seqlock_cur(struct seqlock *sqlo); -/* sqlo++ - note: like x++, returns previous value, before bumping */ + +/* ++sqlo (but atomic & wakes waiters) - returns value that we bumped to. + * + * guarantees: + * - each seqlock_bump call bumps the position by exactly one SEQLOCK_INCR. + * There are no skipped/missed or multiple increments. + * - each return value is only returned from one seqlock_bump() call + */ extern seqlock_val_t seqlock_bump(struct seqlock *sqlo); /* sqlo = val - can be used on held seqlock. */ extern void seqlock_acquire_val(struct seqlock *sqlo, seqlock_val_t val); + /* sqlo = ref - standard pattern: acquire relative to other seqlock */ static inline void seqlock_acquire(struct seqlock *sqlo, struct seqlock *ref) { diff --git a/lib/sigevent.c b/lib/sigevent.c index d02b074223..fcd85d0d43 100644 --- a/lib/sigevent.c +++ b/lib/sigevent.c @@ -24,7 +24,6 @@ #include <memory.h> #include <lib_errors.h> -#ifdef SA_SIGINFO #ifdef HAVE_UCONTEXT_H #ifdef GNU_LINUX /* get REG_EIP from ucontext.h */ @@ -34,7 +33,6 @@ #endif /* GNU_LINUX */ #include <ucontext.h> #endif /* HAVE_UCONTEXT_H */ -#endif /* SA_SIGINFO */ /* master signals descriptor struct */ @@ -158,8 +156,6 @@ static int signal_set(int signo) return 0; } -#ifdef SA_SIGINFO - /* XXX This function should be enhanced to support more platforms (it currently works only on Linux/x86). */ static void *program_counter(void *context) @@ -199,41 +195,19 @@ static void *program_counter(void *context) return NULL; } -#endif /* SA_SIGINFO */ - static void __attribute__((noreturn)) -exit_handler(int signo -#ifdef SA_SIGINFO - , - siginfo_t *siginfo, void *context -#endif - ) +exit_handler(int signo, siginfo_t *siginfo, void *context) { -#ifndef SA_SIGINFO - void *siginfo = NULL; - void *pc = NULL; -#else void *pc = program_counter(context); -#endif zlog_signal(signo, "exiting...", siginfo, pc); _exit(128 + signo); } static void __attribute__((noreturn)) -core_handler(int signo -#ifdef SA_SIGINFO - , - siginfo_t *siginfo, void *context -#endif - ) +core_handler(int signo, siginfo_t *siginfo, void *context) { -#ifndef SA_SIGINFO - void *siginfo = NULL; - void *pc = NULL; -#else void *pc = program_counter(context); -#endif /* make sure we don't hang in here. default for SIGALRM is terminate. * - if we're in backtrace for more than a second, abort. */ @@ -290,12 +264,7 @@ static void trap_default_signals(void) static const struct { const int *sigs; unsigned int nsigs; - void (*handler)(int signo -#ifdef SA_SIGINFO - , - siginfo_t *info, void *context -#endif - ); + void (*handler)(int signo, siginfo_t *info, void *context); } sigmap[] = { {core_signals, array_size(core_signals), core_handler}, {exit_signals, array_size(exit_signals), exit_handler}, @@ -316,15 +285,10 @@ static void trap_default_signals(void) act.sa_handler = SIG_IGN; act.sa_flags = 0; } else { -#ifdef SA_SIGINFO /* Request extra arguments to signal * handler. */ act.sa_sigaction = sigmap[i].handler; act.sa_flags = SA_SIGINFO; -#else - act.sa_handler = sigmap[i].handler; - act.sa_flags = 0; -#endif #ifdef SA_RESETHAND /* don't try to print backtraces * recursively */ diff --git a/lib/subdir.am b/lib/subdir.am index 3754d270a7..2be7537bcc 100644 --- a/lib/subdir.am +++ b/lib/subdir.am @@ -21,6 +21,7 @@ lib_libfrr_la_SOURCES = \ lib/distribute.c \ lib/ferr.c \ lib/filter.c \ + lib/frrcu.c \ lib/frrlua.c \ lib/frr_pthread.c \ lib/frrstr.c \ @@ -160,6 +161,7 @@ pkginclude_HEADERS += \ lib/frrlua.h \ lib/frr_pthread.h \ lib/frratomic.h \ + lib/frrcu.h \ lib/frrstr.h \ lib/getopt.h \ lib/graph.h \ diff --git a/lib/thread.c b/lib/thread.c index f862ce5eb0..5756ebc1f9 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -25,6 +25,7 @@ #include "thread.h" #include "memory.h" +#include "frrcu.h" #include "log.h" #include "hash.h" #include "command.h" @@ -737,6 +738,9 @@ static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, < 0) // effect a poll (return immediately) timeout = 0; + rcu_read_unlock(); + rcu_assert_read_unlocked(); + /* add poll pipe poker */ assert(count + 1 < pfdsize); pfds[count].fd = m->io_pipe[0]; @@ -750,6 +754,8 @@ static int fd_poll(struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0) ; + rcu_read_lock(); + return num; } |
