]> git.puffer.fish Git - matthieu/frr.git/commitdiff
lib: make thread.c pthread-safe
authorQuentin Young <qlyoung@cumulusnetworks.com>
Fri, 3 Mar 2017 19:01:49 +0000 (19:01 +0000)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Fri, 28 Apr 2017 22:43:36 +0000 (22:43 +0000)
This change introduces synchronization mechanisms to thread.c in order
to allow safe concurrent use.

Thread.c should now be threadstafe with respect to:
* struct thread
* struct thread_master

Calls into thread.c for operations upon data of this type should not
require external synchronization.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
lib/thread.c
lib/thread.h

index e707fc584cf893dfc2e7b6ee77f3b9ab7ac7d5a9..3f7ab12b7fd6088e472de8e2673eb2e51f036572 100644 (file)
@@ -41,7 +41,7 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS,  "Thread stats")
 #include <mach/mach_time.h>
 #endif
 
-/* Relative time, since startup */
+static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
 static struct hash *cpu_record = NULL;
 
 static unsigned long
@@ -137,9 +137,14 @@ cpu_record_print(struct vty *vty, thread_type filter)
   vty_out(vty, "Active   Runtime(ms)   Invoked Avg uSec Max uSecs");
   vty_out(vty, " Avg uSec Max uSecs");
   vty_out(vty, "  Type  Thread%s", VTY_NEWLINE);
-  hash_iterate(cpu_record,
-              (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
-              args);
+
+  pthread_mutex_lock (&cpu_record_mtx);
+  {
+    hash_iterate(cpu_record,
+                 (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
+                 args);
+  }
+  pthread_mutex_unlock (&cpu_record_mtx);
 
   if (tmp.total_calls > 0)
     vty_out_cpu_thread_history(vty, &tmp);
@@ -216,16 +221,25 @@ cpu_record_hash_clear (struct hash_backet *bucket,
   if ( !(a->types & *filter) )
        return;
   
-  hash_release (cpu_record, bucket->data);
+  pthread_mutex_lock (&cpu_record_mtx);
+  {
+    hash_release (cpu_record, bucket->data);
+  }
+  pthread_mutex_unlock (&cpu_record_mtx);
 }
 
 static void
 cpu_record_clear (thread_type filter)
 {
   thread_type *tmp = &filter;
-  hash_iterate (cpu_record,
-               (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
-               tmp);
+
+  pthread_mutex_lock (&cpu_record_mtx);
+  {
+    hash_iterate (cpu_record,
+                  (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
+                  tmp);
+  }
+  pthread_mutex_unlock (&cpu_record_mtx);
 }
 
 DEFUN (clear_thread_cpu,
@@ -326,16 +340,20 @@ thread_master_create (void)
 
   getrlimit(RLIMIT_NOFILE, &limit);
 
-  if (cpu_record == NULL) 
-    cpu_record 
-      = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
-                    (int (*) (const void *, const void *))cpu_record_hash_cmp);
+  pthread_mutex_lock (&cpu_record_mtx);
+  {
+    if (cpu_record == NULL)
+      cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
+                                (int (*) (const void *, const void *))
+                                cpu_record_hash_cmp);
+  }
+  pthread_mutex_unlock (&cpu_record_mtx);
 
   rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
   if (rv == NULL)
-    {
-      return NULL;
-    }
+    return NULL;
+
+  pthread_mutex_init (&rv->mtx, NULL);
 
   rv->fd_limit = (int)limit.rlim_cur;
   rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
@@ -498,11 +516,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue)
 void
 thread_master_free_unused (struct thread_master *m)
 {
-  struct thread *t;
-  while ((t = thread_trim_head(&m->unuse)) != NULL)
-    {
-      XFREE(MTYPE_THREAD, t);
-    }
+  pthread_mutex_lock (&m->mtx);
+  {
+    struct thread *t;
+    while ((t = thread_trim_head(&m->unuse)) != NULL)
+      {
+        pthread_mutex_destroy (&t->mtx);
+        XFREE(MTYPE_THREAD, t);
+      }
+  }
+  pthread_mutex_unlock (&m->mtx);
 }
 
 /* Stop thread scheduler. */
@@ -516,25 +539,37 @@ thread_master_free (struct thread_master *m)
   thread_list_free (m, &m->ready);
   thread_list_free (m, &m->unuse);
   thread_queue_free (m, m->background);
+  pthread_mutex_destroy (&m->mtx);
 
 #if defined(HAVE_POLL)
   XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
 #endif
   XFREE (MTYPE_THREAD_MASTER, m);
 
-  if (cpu_record)
-    {
-      hash_clean (cpu_record, cpu_record_hash_free);
-      hash_free (cpu_record);
-      cpu_record = NULL;
-    }
+  pthread_mutex_lock (&cpu_record_mtx);
+  {
+    if (cpu_record)
+      {
+        hash_clean (cpu_record, cpu_record_hash_free);
+        hash_free (cpu_record);
+        cpu_record = NULL;
+      }
+  }
+  pthread_mutex_unlock (&cpu_record_mtx);
 }
 
 /* Return remain time in second. */
 unsigned long
 thread_timer_remain_second (struct thread *thread)
 {
-  int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
+  int64_t remain;
+
+  pthread_mutex_lock (&thread->mtx);
+  {
+    remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
+  }
+  pthread_mutex_unlock (&thread->mtx);
+
   return remain < 0 ? 0 : remain;
 }
 
@@ -545,7 +580,11 @@ struct timeval
 thread_timer_remain(struct thread *thread)
 {
   struct timeval remain;
-  monotime_until(&thread->u.sands, &remain);
+  pthread_mutex_lock (&thread->mtx);
+  {
+    monotime_until(&thread->u.sands, &remain);
+  }
+  pthread_mutex_unlock (&thread->mtx);
   return remain;
 }
 
@@ -560,8 +599,11 @@ thread_get (struct thread_master *m, u_char type,
   if (! thread)
     {
       thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
+      /* mutex only needs to be initialized at struct creation. */
+      pthread_mutex_init (&thread->mtx, NULL);
       m->alloc++;
     }
+
   thread->type = type;
   thread->add_type = type;
   thread->master = m;
@@ -584,8 +626,12 @@ thread_get (struct thread_master *m, u_char type,
     {
       tmp.func = func;
       tmp.funcname = funcname;
-      thread->hist = hash_get (cpu_record, &tmp,
-                              (void * (*) (void *))cpu_record_hash_alloc);
+      pthread_mutex_lock (&cpu_record_mtx);
+      {
+        thread->hist = hash_get (cpu_record, &tmp,
+                                 (void * (*) (void *))cpu_record_hash_alloc);
+      }
+      pthread_mutex_unlock (&cpu_record_mtx);
     }
   thread->hist->total_active++;
   thread->func = func;
@@ -703,36 +749,39 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
 {
   struct thread *thread = NULL;
 
-#if !defined(HAVE_POLL)
-  thread_fd_set *fdset = NULL;
-  if (dir == THREAD_READ)
-    fdset = &m->handler.readfd;
-  else
-    fdset = &m->handler.writefd;
-#endif
-
+  pthread_mutex_lock (&m->mtx);
+  {
 #if defined (HAVE_POLL)
-  thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
-
-  if (thread == NULL)
-    return NULL;
+    thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
 #else
-  if (FD_ISSET (fd, fdset))
-    {
-      zlog_warn ("There is already %s fd [%d]",
-                 (dir == THREAD_READ) ? "read" : "write", fd);
-      return NULL;
-    }
+    thread_fd_set *fdset = NULL;
+    if (dir == THREAD_READ)
+      fdset = &m->handler.readfd;
+    else
+      fdset = &m->handler.writefd;
 
-  FD_SET (fd, fdset);
-  thread = thread_get (m, dir, func, arg, debugargpass);
+    if (FD_ISSET (fd, fdset))
+      {
+        zlog_warn ("There is already %s fd [%d]",
+                   (dir == THREAD_READ) ? "read" : "write", fd);
+      }
+    else
+      {
+        FD_SET (fd, fdset);
+        thread = thread_get (m, dir, func, arg, debugargpass);
+      }
 #endif
 
-  thread->u.fd = fd;
-  if (dir == THREAD_READ)
-    thread_add_fd (m->read, thread);
-  else
-    thread_add_fd (m->write, thread);
+    if (thread)
+    {
+      thread->u.fd = fd;
+      if (dir == THREAD_READ)
+        thread_add_fd (m->read, thread);
+      else
+        thread_add_fd (m->write, thread);
+    }
+  }
+  pthread_mutex_unlock (&m->mtx);
 
   return thread;
 }
@@ -754,7 +803,11 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
   assert (time_relative);
   
   queue = ((type == THREAD_TIMER) ? m->timer : m->background);
-  thread = thread_get (m, type, func, arg, debugargpass);
+  pthread_mutex_lock (&m->mtx);
+  {
+    thread = thread_get (m, type, func, arg, debugargpass);
+  }
+  pthread_mutex_unlock (&m->mtx);
 
   monotime(&thread->u.sands);
   timeradd(&thread->u.sands, time_relative, &thread->u.sands);
@@ -847,9 +900,13 @@ funcname_thread_add_event (struct thread_master *m,
 
   assert (m != NULL);
 
-  thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
-  thread->u.val = val;
-  thread_list_add (&m->event, thread);
+  pthread_mutex_lock (&m->mtx);
+  {
+    thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
+    thread->u.val = val;
+    thread_list_add (&m->event, thread);
+  }
+  pthread_mutex_unlock (&m->mtx);
 
   return thread;
 }
@@ -880,14 +937,19 @@ thread_cancel_read_or_write (struct thread *thread, short int state)
   fd_clear_read_write (thread);
 }
 
-/* Cancel thread from scheduler. */
+/**
+ * Cancel thread from scheduler.
+ *
+ * This function is *NOT* MT-safe. DO NOT call it from any other thread than
+ * the primary thread.
+ */
 void
 thread_cancel (struct thread *thread)
 {
   struct thread_list *list = NULL;
   struct pqueue *queue = NULL;
   struct thread **thread_array = NULL;
-  
+
   switch (thread->type)
     {
     case THREAD_READ:
@@ -951,39 +1013,48 @@ thread_cancel_event (struct thread_master *m, void *arg)
 {
   unsigned int ret = 0;
   struct thread *thread;
+  struct thread *t;
 
-  thread = m->event.head;
-  while (thread)
-    {
-      struct thread *t;
-
-      t = thread;
-      thread = t->next;
-
-      if (t->arg == arg)
+  pthread_mutex_lock (&m->mtx);
+  {
+    thread = m->event.head;
+    while (thread)
+      {
+        t = thread;
+        pthread_mutex_lock (&t->mtx);
         {
-          ret++;
-          thread_list_delete (&m->event, t);
-          thread_add_unuse (m, t);
+          thread = t->next;
+
+          if (t->arg == arg)
+            {
+              ret++;
+              thread_list_delete (&m->event, t);
+              thread_add_unuse (m, t);
+            }
         }
-    }
-
-  /* thread can be on the ready list too */
-  thread = m->ready.head;
-  while (thread)
-    {
-      struct thread *t;
-
-      t = thread;
-      thread = t->next;
+        pthread_mutex_unlock (&t->mtx);
+      }
 
-      if (t->arg == arg)
+    /* thread can be on the ready list too */
+    thread = m->ready.head;
+    while (thread)
+      {
+        t = thread;
+        pthread_mutex_lock (&t->mtx);
         {
-          ret++;
-          thread_list_delete (&m->ready, t);
-          thread_add_unuse (m, t);
+          thread = t->next;
+
+          if (t->arg == arg)
+            {
+              ret++;
+              thread_list_delete (&m->ready, t);
+              thread_add_unuse (m, t);
+            }
         }
-    }
+        pthread_mutex_unlock (&t->mtx);
+      }
+  }
+  pthread_mutex_unlock (&m->mtx);
   return ret;
 }
 
@@ -1143,18 +1214,25 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
   struct timeval *timer_wait = &timer_val;
   struct timeval *timer_wait_bg;
 
+  pthread_mutex_lock (&m->mtx);
   while (1)
     {
       int num = 0;
 
       /* Signals pre-empt everything */
+      pthread_mutex_unlock (&m->mtx);
       quagga_sigevent_process ();
+      pthread_mutex_lock (&m->mtx);
        
       /* Drain the ready queue of already scheduled jobs, before scheduling
        * more.
        */
       if ((thread = thread_trim_head (&m->ready)) != NULL)
-        return thread_run (m, thread, fetch);
+        {
+          fetch = thread_run (m, thread, fetch);
+          pthread_mutex_unlock (&m->mtx);
+          return fetch;
+        }
       
       /* To be fair to all kinds of threads, and avoid starvation, we
        * need to be careful to consider all thread types for scheduling
@@ -1196,6 +1274,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
           if (errno == EINTR)
             continue; /* signal received - process it */
           zlog_warn ("select() error: %s", safe_strerror (errno));
+          pthread_mutex_unlock (&m->mtx);
           return NULL;
         }
 
@@ -1215,14 +1294,22 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
         list at this time.  If this is code is uncommented, then background
         timer threads will not run unless there is nothing else to do. */
       if ((thread = thread_trim_head (&m->ready)) != NULL)
-        return thread_run (m, thread, fetch);
+        {
+          fetch = thread_run (m, thread, fetch);
+          pthread_mutex_unlock (&m->mtx);
+          return fetch;
+        }
 #endif
 
       /* Background timer/events, lowest priority */
       thread_timer_process (m->background, &now);
       
       if ((thread = thread_trim_head (&m->ready)) != NULL)
-        return thread_run (m, thread, fetch);
+        {
+          fetch = thread_run (m, thread, fetch);
+          pthread_mutex_unlock (&m->mtx);
+          return fetch;
+        }
     }
 }
 
@@ -1248,13 +1335,23 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
 int
 thread_should_yield (struct thread *thread)
 {
-  return monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
+  int result;
+  pthread_mutex_lock (&thread->mtx);
+  {
+    result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
+  }
+  pthread_mutex_unlock (&thread->mtx);
+  return result;
 }
 
 void
 thread_set_yield_time (struct thread *thread, unsigned long yield_time)
 {
-  thread->yield = yield_time;
+  pthread_mutex_lock (&thread->mtx);
+  {
+    thread->yield = yield_time;
+  }
+  pthread_mutex_unlock (&thread->mtx);
 }
 
 void
@@ -1324,6 +1421,7 @@ funcname_thread_execute (struct thread_master *m,
 
   memset (&dummy, 0, sizeof (struct thread));
 
+  pthread_mutex_init (&dummy.mtx, NULL);
   dummy.type = THREAD_EVENT;
   dummy.add_type = THREAD_EXECUTE;
   dummy.master = NULL;
@@ -1332,8 +1430,12 @@ funcname_thread_execute (struct thread_master *m,
 
   tmp.func = dummy.func = func;
   tmp.funcname = dummy.funcname = funcname;
-  dummy.hist = hash_get (cpu_record, &tmp,
-                        (void * (*) (void *))cpu_record_hash_alloc);
+  pthread_mutex_lock (&cpu_record_mtx);
+  {
+    dummy.hist = hash_get (cpu_record, &tmp,
+                           (void * (*) (void *))cpu_record_hash_alloc);
+  }
+  pthread_mutex_unlock (&cpu_record_mtx);
 
   dummy.schedfrom = schedfrom;
   dummy.schedfrom_line = fromln;
index 34adcc4d0991cc3beb494cf3a23630066d951c88..0cc9841272fcb52196496ca7756d648cf8310277 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <zebra.h>
 #include "monotime.h"
+#include <pthread.h>
 
 struct rusage_t
 {
@@ -84,6 +85,7 @@ struct thread_master
   int fd_limit;
   struct fd_handler handler;
   unsigned long alloc;
+  pthread_mutex_t mtx;
 };
 
 typedef unsigned char thread_type;
@@ -110,6 +112,7 @@ struct thread
   const char *funcname;
   const char *schedfrom;
   int schedfrom_line;
+  pthread_mutex_t mtx;
 };
 
 struct cpu_thread_history