From 1189d95fcab50132623cb8f8c25f78ed68ec36e2 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Fri, 3 Mar 2017 19:01:49 +0000 Subject: [PATCH] lib: make thread.c pthread-safe This change introduces synchronization mechanisms to thread.c in order to allow safe concurrent use. Thread.c should now be threadstafe with respect to: * struct thread * struct thread_master Calls into thread.c for operations upon data of this type should not require external synchronization. Signed-off-by: Quentin Young --- lib/thread.c | 292 ++++++++++++++++++++++++++++++++++----------------- lib/thread.h | 3 + 2 files changed, 200 insertions(+), 95 deletions(-) diff --git a/lib/thread.c b/lib/thread.c index e707fc584c..3f7ab12b7f 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -41,7 +41,7 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include #endif -/* Relative time, since startup */ +static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; static unsigned long @@ -137,9 +137,14 @@ cpu_record_print(struct vty *vty, thread_type filter) vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); vty_out(vty, " Avg uSec Max uSecs"); vty_out(vty, " Type Thread%s", VTY_NEWLINE); - hash_iterate(cpu_record, - (void(*)(struct hash_backet*,void*))cpu_record_hash_print, - args); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate(cpu_record, + (void(*)(struct hash_backet*,void*))cpu_record_hash_print, + args); + } + pthread_mutex_unlock (&cpu_record_mtx); if (tmp.total_calls > 0) vty_out_cpu_thread_history(vty, &tmp); @@ -216,16 +221,25 @@ cpu_record_hash_clear (struct hash_backet *bucket, if ( !(a->types & *filter) ) return; - hash_release (cpu_record, bucket->data); + pthread_mutex_lock (&cpu_record_mtx); + { + hash_release (cpu_record, bucket->data); + } + pthread_mutex_unlock (&cpu_record_mtx); } static void cpu_record_clear (thread_type filter) { thread_type *tmp = &filter; - hash_iterate (cpu_record, - (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, - tmp); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate (cpu_record, + (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, + tmp); + } + pthread_mutex_unlock (&cpu_record_mtx); } DEFUN (clear_thread_cpu, @@ -326,16 +340,20 @@ thread_master_create (void) getrlimit(RLIMIT_NOFILE, &limit); - if (cpu_record == NULL) - cpu_record - = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, - (int (*) (const void *, const void *))cpu_record_hash_cmp); + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record == NULL) + cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, + (int (*) (const void *, const void *)) + cpu_record_hash_cmp); + } + pthread_mutex_unlock (&cpu_record_mtx); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); if (rv == NULL) - { - return NULL; - } + return NULL; + + pthread_mutex_init (&rv->mtx, NULL); rv->fd_limit = (int)limit.rlim_cur; rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); @@ -498,11 +516,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue) void thread_master_free_unused (struct thread_master *m) { - struct thread *t; - while ((t = thread_trim_head(&m->unuse)) != NULL) - { - XFREE(MTYPE_THREAD, t); - } + pthread_mutex_lock (&m->mtx); + { + struct thread *t; + while ((t = thread_trim_head(&m->unuse)) != NULL) + { + pthread_mutex_destroy (&t->mtx); + XFREE(MTYPE_THREAD, t); + } + } + pthread_mutex_unlock (&m->mtx); } /* Stop thread scheduler. */ @@ -516,25 +539,37 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); + pthread_mutex_destroy (&m->mtx); #if defined(HAVE_POLL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); #endif XFREE (MTYPE_THREAD_MASTER, m); - if (cpu_record) - { - hash_clean (cpu_record, cpu_record_hash_free); - hash_free (cpu_record); - cpu_record = NULL; - } + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record) + { + hash_clean (cpu_record, cpu_record_hash_free); + hash_free (cpu_record); + cpu_record = NULL; + } + } + pthread_mutex_unlock (&cpu_record_mtx); } /* Return remain time in second. */ unsigned long thread_timer_remain_second (struct thread *thread) { - int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + int64_t remain; + + pthread_mutex_lock (&thread->mtx); + { + remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + } + pthread_mutex_unlock (&thread->mtx); + return remain < 0 ? 0 : remain; } @@ -545,7 +580,11 @@ struct timeval thread_timer_remain(struct thread *thread) { struct timeval remain; - monotime_until(&thread->u.sands, &remain); + pthread_mutex_lock (&thread->mtx); + { + monotime_until(&thread->u.sands, &remain); + } + pthread_mutex_unlock (&thread->mtx); return remain; } @@ -560,8 +599,11 @@ thread_get (struct thread_master *m, u_char type, if (! thread) { thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread)); + /* mutex only needs to be initialized at struct creation. */ + pthread_mutex_init (&thread->mtx, NULL); m->alloc++; } + thread->type = type; thread->add_type = type; thread->master = m; @@ -584,8 +626,12 @@ thread_get (struct thread_master *m, u_char type, { tmp.func = func; tmp.funcname = funcname; - thread->hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); + pthread_mutex_lock (&cpu_record_mtx); + { + thread->hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); } thread->hist->total_active++; thread->func = func; @@ -703,36 +749,39 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { struct thread *thread = NULL; -#if !defined(HAVE_POLL) - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; -#endif - + pthread_mutex_lock (&m->mtx); + { #if defined (HAVE_POLL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); - - if (thread == NULL) - return NULL; + thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); #else - if (FD_ISSET (fd, fdset)) - { - zlog_warn ("There is already %s fd [%d]", - (dir == THREAD_READ) ? "read" : "write", fd); - return NULL; - } + thread_fd_set *fdset = NULL; + if (dir == THREAD_READ) + fdset = &m->handler.readfd; + else + fdset = &m->handler.writefd; - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); + if (FD_ISSET (fd, fdset)) + { + zlog_warn ("There is already %s fd [%d]", + (dir == THREAD_READ) ? "read" : "write", fd); + } + else + { + FD_SET (fd, fdset); + thread = thread_get (m, dir, func, arg, debugargpass); + } #endif - thread->u.fd = fd; - if (dir == THREAD_READ) - thread_add_fd (m->read, thread); - else - thread_add_fd (m->write, thread); + if (thread) + { + thread->u.fd = fd; + if (dir == THREAD_READ) + thread_add_fd (m->read, thread); + else + thread_add_fd (m->write, thread); + } + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -754,7 +803,11 @@ funcname_thread_add_timer_timeval (struct thread_master *m, assert (time_relative); queue = ((type == THREAD_TIMER) ? m->timer : m->background); - thread = thread_get (m, type, func, arg, debugargpass); + pthread_mutex_lock (&m->mtx); + { + thread = thread_get (m, type, func, arg, debugargpass); + } + pthread_mutex_unlock (&m->mtx); monotime(&thread->u.sands); timeradd(&thread->u.sands, time_relative, &thread->u.sands); @@ -847,9 +900,13 @@ funcname_thread_add_event (struct thread_master *m, assert (m != NULL); - thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); - thread->u.val = val; - thread_list_add (&m->event, thread); + pthread_mutex_lock (&m->mtx); + { + thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); + thread->u.val = val; + thread_list_add (&m->event, thread); + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -880,14 +937,19 @@ thread_cancel_read_or_write (struct thread *thread, short int state) fd_clear_read_write (thread); } -/* Cancel thread from scheduler. */ +/** + * Cancel thread from scheduler. + * + * This function is *NOT* MT-safe. DO NOT call it from any other thread than + * the primary thread. + */ void thread_cancel (struct thread *thread) { struct thread_list *list = NULL; struct pqueue *queue = NULL; struct thread **thread_array = NULL; - + switch (thread->type) { case THREAD_READ: @@ -951,39 +1013,48 @@ thread_cancel_event (struct thread_master *m, void *arg) { unsigned int ret = 0; struct thread *thread; + struct thread *t; - thread = m->event.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; - - if (t->arg == arg) + pthread_mutex_lock (&m->mtx); + { + thread = m->event.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->event, t); - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->event, t); + thread_add_unuse (m, t); + } } - } - - /* thread can be on the ready list too */ - thread = m->ready.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; + pthread_mutex_unlock (&t->mtx); + } - if (t->arg == arg) + /* thread can be on the ready list too */ + thread = m->ready.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->ready, t); - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->ready, t); + thread_add_unuse (m, t); + } } - } + pthread_mutex_unlock (&t->mtx); + } + } + pthread_mutex_unlock (&m->mtx); return ret; } @@ -1143,18 +1214,25 @@ thread_fetch (struct thread_master *m, struct thread *fetch) struct timeval *timer_wait = &timer_val; struct timeval *timer_wait_bg; + pthread_mutex_lock (&m->mtx); while (1) { int num = 0; /* Signals pre-empt everything */ + pthread_mutex_unlock (&m->mtx); quagga_sigevent_process (); + pthread_mutex_lock (&m->mtx); /* Drain the ready queue of already scheduled jobs, before scheduling * more. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } /* To be fair to all kinds of threads, and avoid starvation, we * need to be careful to consider all thread types for scheduling @@ -1196,6 +1274,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) if (errno == EINTR) continue; /* signal received - process it */ zlog_warn ("select() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); return NULL; } @@ -1215,14 +1294,22 @@ thread_fetch (struct thread_master *m, struct thread *fetch) list at this time. If this is code is uncommented, then background timer threads will not run unless there is nothing else to do. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } #endif /* Background timer/events, lowest priority */ thread_timer_process (m->background, &now); if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } } } @@ -1248,13 +1335,23 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime) int thread_should_yield (struct thread *thread) { - return monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + int result; + pthread_mutex_lock (&thread->mtx); + { + result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + } + pthread_mutex_unlock (&thread->mtx); + return result; } void thread_set_yield_time (struct thread *thread, unsigned long yield_time) { - thread->yield = yield_time; + pthread_mutex_lock (&thread->mtx); + { + thread->yield = yield_time; + } + pthread_mutex_unlock (&thread->mtx); } void @@ -1324,6 +1421,7 @@ funcname_thread_execute (struct thread_master *m, memset (&dummy, 0, sizeof (struct thread)); + pthread_mutex_init (&dummy.mtx, NULL); dummy.type = THREAD_EVENT; dummy.add_type = THREAD_EXECUTE; dummy.master = NULL; @@ -1332,8 +1430,12 @@ funcname_thread_execute (struct thread_master *m, tmp.func = dummy.func = func; tmp.funcname = dummy.funcname = funcname; - dummy.hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); + pthread_mutex_lock (&cpu_record_mtx); + { + dummy.hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); dummy.schedfrom = schedfrom; dummy.schedfrom_line = fromln; diff --git a/lib/thread.h b/lib/thread.h index 34adcc4d09..0cc9841272 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -24,6 +24,7 @@ #include #include "monotime.h" +#include struct rusage_t { @@ -84,6 +85,7 @@ struct thread_master int fd_limit; struct fd_handler handler; unsigned long alloc; + pthread_mutex_t mtx; }; typedef unsigned char thread_type; @@ -110,6 +112,7 @@ struct thread const char *funcname; const char *schedfrom; int schedfrom_line; + pthread_mutex_t mtx; }; struct cpu_thread_history -- 2.39.5