]> git.puffer.fish Git - matthieu/frr.git/commitdiff
lib: allow pthreads to poke poll()
authorQuentin Young <qlyoung@cumulusnetworks.com>
Wed, 10 May 2017 18:09:49 +0000 (18:09 +0000)
committerQuentin Young <qlyoung@users.noreply.github.com>
Tue, 30 May 2017 14:27:10 +0000 (10:27 -0400)
When scheduling a task onto a thread master owned by another pthread, we
need to lock the thread master's mutex. However, if the pthread which
owns that thread master is in poll(), we could be stuck waiting for a
very long time. To solve this, we copy all data poll() needs and unlock
during poll(). To break the target pthread out of poll(), thread_master
has gained a pipe whose reading end is passed into poll(). After an event
that requires immediate action by the target pthread, a byte is written
into the pipe in order to wake it up.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
[DL: split off from select() removal]

lib/thread.c
lib/thread.h

index 7f58aea789a3d11e8235f1e3cec7d4cfdc01e9d1..4077358dcfbd34be4d3e38fb33448d2ebb909aef 100644 (file)
@@ -30,6 +30,7 @@
 #include "pqueue.h"
 #include "command.h"
 #include "sigevent.h"
+#include "network.h"
 
 DEFINE_MTYPE_STATIC(LIB, THREAD,        "Thread")
 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
@@ -40,6 +41,12 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS,  "Thread stats")
 #include <mach/mach_time.h>
 #endif
 
+#define AWAKEN(m) \
+  do { \
+      static unsigned char wakebyte = 0x01; \
+      write (m->io_pipe[1], &wakebyte, 1); \
+  } while (0);
+
 static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
 static struct hash *cpu_record = NULL;
 
@@ -378,6 +385,8 @@ thread_master_create (void)
   rv->spin = true;
   rv->handle_signals = true;
   rv->owner = pthread_self();
+  pipe (rv->io_pipe);
+  set_nonblocking (rv->io_pipe[0]);
 
   rv->handler.pfdsize = rv->fd_limit;
   rv->handler.pfdcount = 0;
@@ -530,6 +539,8 @@ thread_master_free (struct thread_master *m)
   thread_list_free (m, &m->unuse);
   thread_queue_free (m, m->background);
   pthread_mutex_destroy (&m->mtx);
+  close (m->io_pipe[0]);
+  close (m->io_pipe[1]);
 
   XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
   XFREE (MTYPE_THREAD_MASTER, m);
@@ -635,23 +646,20 @@ static int
 fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
          nfds_t count, struct timeval *timer_wait)
 {
-  int num;
-
-  /* If timer_wait is null here, that means either select() or poll() should
-   * block indefinitely, unless the thread_master has overriden it. select()
-   * and poll() differ in the timeout values they interpret as an indefinite
-   * block; select() requires a null pointer, while poll takes a millisecond
-   * value of -1.
-   *
-   * The thread_master owner has the option of overriding the default behavior
-   * by setting ->selectpoll_timeout. If the value is positive, it specifies
-   * the maximum number of milliseconds to wait. If the timeout is -1, it
-   * specifies that we should never wait and always return immediately even if
-   * no event is detected. If the value is zero, the behavior is default.
-   */
+  if (count == 0)
+    return 0;
 
+  /* If timer_wait is null here, that means poll() should block indefinitely,
+   * unless the thread_master has overriden it by setting ->selectpoll_timeout.
+   * If the value is positive, it specifies the maximum number of milliseconds
+   * to wait. If the timeout is -1, it specifies that we should never wait and
+   * always return immediately even if no event is detected. If the value is
+   * zero, the behavior is default. */
   int timeout = -1;
 
+  /* number of file descriptors with events */
+  int num;
+
   if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value
     timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
   else if (m->selectpoll_timeout > 0) // use the user's timeout
@@ -659,7 +667,17 @@ fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
   else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
     timeout = 0;
 
-  num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
+  /* add poll pipe poker */
+  assert (count + 1 < pfdsize);
+  pfds[count].fd = m->io_pipe[0];
+  pfds[count].events = POLLIN;
+  pfds[count].revents = 0x00;
+
+  num = poll (pfds, count + 1, timeout);
+
+  static unsigned char trash[64];
+  if (num > 0 && pfds[count].revents != 0 && num--)
+    while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0);
 
   return num;
 }
@@ -691,7 +709,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
           break;
         }
 
-    assert (queuepos < m->handler.pfdsize);
+    /* make sure we have room for this fd + pipe poker fd */
+    assert (queuepos + 1 < m->handler.pfdsize);
 
     thread = thread_get (m, dir, func, arg, debugargpass);
 
@@ -761,6 +780,8 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
         }
     }
     pthread_mutex_unlock (&thread->mtx);
+
+    AWAKEN (m);
   }
   pthread_mutex_unlock (&m->mtx);
 
@@ -868,6 +889,8 @@ funcname_thread_add_event (struct thread_master *m,
         *t_ptr = thread;
         thread->ref = t_ptr;
       }
+
+    AWAKEN (m);
   }
   pthread_mutex_unlock (&m->mtx);
 
@@ -1205,7 +1228,16 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
           timer_wait = &timer_val;
         }
 
-      num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait);
+      /* copy pollfds so we can unlock during blocking calls to poll() */
+      struct pollfd pfds[m->handler.pfdsize];
+      unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp;
+      memcpy (pfds, m->handler.pfds, count * sizeof (struct pollfd));
+
+      pthread_mutex_unlock (&m->mtx);
+      {
+        num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait);
+      }
+      pthread_mutex_lock (&m->mtx);
       
       /* Signals should get quick treatment */
       if (num < 0)
index e40bf64855607962908fa6d7379a8263f974b856..753aa41ffd131968c284e5ff3538f3e98f77113f 100644 (file)
@@ -66,6 +66,7 @@ struct thread_master
   struct thread_list ready;
   struct thread_list unuse;
   struct pqueue *background;
+  int io_pipe[2];
   int fd_limit;
   struct fd_handler handler;
   unsigned long alloc;