From: Quentin Young Date: Wed, 10 May 2017 18:09:49 +0000 (+0000) Subject: lib: allow pthreads to poke poll() X-Git-Tag: reindent-master-before~105 X-Git-Url: https://git.puffer.fish/?a=commitdiff_plain;h=3bf2673b3094727edee168f4b5a93095420f6b23;p=matthieu%2Ffrr.git lib: allow pthreads to poke poll() When scheduling a task onto a thread master owned by another pthread, we need to lock the thread master's mutex. However, if the pthread which owns that thread master is in poll(), we could be stuck waiting for a very long time. To solve this, we copy all data poll() needs and unlock during poll(). To break the target pthread out of poll(), thread_master has gained a pipe whose reading end is passed into poll(). After an event that requires immediate action by the target pthread, a byte is written into the pipe in order to wake it up. Signed-off-by: Quentin Young [DL: split off from select() removal] --- diff --git a/lib/thread.c b/lib/thread.c index 7f58aea789..4077358dcf 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -30,6 +30,7 @@ #include "pqueue.h" #include "command.h" #include "sigevent.h" +#include "network.h" DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread") DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master") @@ -40,6 +41,12 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include #endif +#define AWAKEN(m) \ + do { \ + static unsigned char wakebyte = 0x01; \ + write (m->io_pipe[1], &wakebyte, 1); \ + } while (0); + static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; @@ -378,6 +385,8 @@ thread_master_create (void) rv->spin = true; rv->handle_signals = true; rv->owner = pthread_self(); + pipe (rv->io_pipe); + set_nonblocking (rv->io_pipe[0]); rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; @@ -530,6 +539,8 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); pthread_mutex_destroy (&m->mtx); + close (m->io_pipe[0]); + close (m->io_pipe[1]); XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); XFREE (MTYPE_THREAD_MASTER, m); @@ -635,23 +646,20 @@ static int fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, nfds_t count, struct timeval *timer_wait) { - int num; - - /* If timer_wait is null here, that means either select() or poll() should - * block indefinitely, unless the thread_master has overriden it. select() - * and poll() differ in the timeout values they interpret as an indefinite - * block; select() requires a null pointer, while poll takes a millisecond - * value of -1. - * - * The thread_master owner has the option of overriding the default behavior - * by setting ->selectpoll_timeout. If the value is positive, it specifies - * the maximum number of milliseconds to wait. If the timeout is -1, it - * specifies that we should never wait and always return immediately even if - * no event is detected. If the value is zero, the behavior is default. - */ + if (count == 0) + return 0; + /* If timer_wait is null here, that means poll() should block indefinitely, + * unless the thread_master has overriden it by setting ->selectpoll_timeout. + * If the value is positive, it specifies the maximum number of milliseconds + * to wait. If the timeout is -1, it specifies that we should never wait and + * always return immediately even if no event is detected. If the value is + * zero, the behavior is default. */ int timeout = -1; + /* number of file descriptors with events */ + int num; + if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); else if (m->selectpoll_timeout > 0) // use the user's timeout @@ -659,7 +667,17 @@ fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) timeout = 0; - num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); + /* add poll pipe poker */ + assert (count + 1 < pfdsize); + pfds[count].fd = m->io_pipe[0]; + pfds[count].events = POLLIN; + pfds[count].revents = 0x00; + + num = poll (pfds, count + 1, timeout); + + static unsigned char trash[64]; + if (num > 0 && pfds[count].revents != 0 && num--) + while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0); return num; } @@ -691,7 +709,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, break; } - assert (queuepos < m->handler.pfdsize); + /* make sure we have room for this fd + pipe poker fd */ + assert (queuepos + 1 < m->handler.pfdsize); thread = thread_get (m, dir, func, arg, debugargpass); @@ -761,6 +780,8 @@ funcname_thread_add_timer_timeval (struct thread_master *m, } } pthread_mutex_unlock (&thread->mtx); + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -868,6 +889,8 @@ funcname_thread_add_event (struct thread_master *m, *t_ptr = thread; thread->ref = t_ptr; } + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -1205,7 +1228,16 @@ thread_fetch (struct thread_master *m, struct thread *fetch) timer_wait = &timer_val; } - num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); + /* copy pollfds so we can unlock during blocking calls to poll() */ + struct pollfd pfds[m->handler.pfdsize]; + unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; + memcpy (pfds, m->handler.pfds, count * sizeof (struct pollfd)); + + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); + } + pthread_mutex_lock (&m->mtx); /* Signals should get quick treatment */ if (num < 0) diff --git a/lib/thread.h b/lib/thread.h index e40bf64855..753aa41ffd 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -66,6 +66,7 @@ struct thread_master struct thread_list ready; struct thread_list unuse; struct pqueue *background; + int io_pipe[2]; int fd_limit; struct fd_handler handler; unsigned long alloc;