diff options
Diffstat (limited to 'lib/thread.c')
| -rw-r--r-- | lib/thread.c | 1024 |
1 files changed, 488 insertions, 536 deletions
diff --git a/lib/thread.c b/lib/thread.c index e71b1cb139..848e39e1ae 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -13,10 +13,9 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with GNU Zebra; see the file COPYING. If not, write to the Free - * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - * 02111-1307, USA. + * You should have received a copy of the GNU General Public License along + * with this program; see the file COPYING; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /* #define DEBUG */ @@ -31,6 +30,7 @@ #include "pqueue.h" #include "command.h" #include "sigevent.h" +#include "network.h" DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread") DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master") @@ -41,151 +41,22 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include <mach/mach_time.h> #endif -/* Recent absolute time of day */ -struct timeval recent_time; -/* Relative time, since startup */ -static struct timeval relative_time; +#define AWAKEN(m) \ + do { \ + static unsigned char wakebyte = 0x01; \ + write (m->io_pipe[1], &wakebyte, 1); \ + } while (0); +static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; -/* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO). - And change negative values to 0. */ -static struct timeval -timeval_adjust (struct timeval a) -{ - while (a.tv_usec >= TIMER_SECOND_MICRO) - { - a.tv_usec -= TIMER_SECOND_MICRO; - a.tv_sec++; - } - - while (a.tv_usec < 0) - { - a.tv_usec += TIMER_SECOND_MICRO; - a.tv_sec--; - } - - if (a.tv_sec < 0) - /* Change negative timeouts to 0. */ - a.tv_sec = a.tv_usec = 0; - - return a; -} - -static struct timeval -timeval_subtract (struct timeval a, struct timeval b) -{ - struct timeval ret; - - ret.tv_usec = a.tv_usec - b.tv_usec; - ret.tv_sec = a.tv_sec - b.tv_sec; - - return timeval_adjust (ret); -} - -static long -timeval_cmp (struct timeval a, struct timeval b) -{ - return (a.tv_sec == b.tv_sec - ? a.tv_usec - b.tv_usec : a.tv_sec - b.tv_sec); -} - -unsigned long +static unsigned long timeval_elapsed (struct timeval a, struct timeval b) { return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO) + (a.tv_usec - b.tv_usec)); } -/* gettimeofday wrapper, to keep recent_time updated */ -static int -quagga_gettimeofday (struct timeval *tv) -{ - int ret; - - assert (tv); - - if (!(ret = gettimeofday (&recent_time, NULL))) - { - /* avoid copy if user passed recent_time pointer.. */ - if (tv != &recent_time) - *tv = recent_time; - return 0; - } - return ret; -} - -static int -quagga_get_relative (struct timeval *tv) -{ - int ret; - -#ifdef HAVE_CLOCK_MONOTONIC - { - struct timespec tp; - if (!(ret = clock_gettime (CLOCK_MONOTONIC, &tp))) - { - relative_time.tv_sec = tp.tv_sec; - relative_time.tv_usec = tp.tv_nsec / 1000; - } - } -#elif defined(__APPLE__) - { - uint64_t ticks; - uint64_t useconds; - static mach_timebase_info_data_t timebase_info; - - ticks = mach_absolute_time(); - if (timebase_info.denom == 0) - mach_timebase_info(&timebase_info); - - useconds = ticks * timebase_info.numer / timebase_info.denom / 1000; - relative_time.tv_sec = useconds / 1000000; - relative_time.tv_usec = useconds % 1000000; - - return 0; - } -#else /* !HAVE_CLOCK_MONOTONIC && !__APPLE__ */ -#error no monotonic clock on this system -#endif /* HAVE_CLOCK_MONOTONIC */ - - if (tv) - *tv = relative_time; - - return ret; -} - -/* Exported Quagga timestamp function. - * Modelled on POSIX clock_gettime. - */ -int -quagga_gettime (enum quagga_clkid clkid, struct timeval *tv) -{ - switch (clkid) - { - case QUAGGA_CLK_MONOTONIC: - return quagga_get_relative (tv); - default: - errno = EINVAL; - return -1; - } -} - -time_t -quagga_monotime (void) -{ - struct timeval tv; - quagga_get_relative(&tv); - return tv.tv_sec; -} - -/* Public export of recent_relative_time by value */ -struct timeval -recent_relative_time (void) -{ - return relative_time; -} - static unsigned int cpu_record_hash_key (struct cpu_thread_history *a) { @@ -221,8 +92,8 @@ static void vty_out_cpu_thread_history(struct vty* vty, struct cpu_thread_history *a) { - vty_out(vty, "%10ld.%03ld %9d %8ld %9ld %8ld %9ld", - a->cpu.total/1000, a->cpu.total%1000, a->total_calls, + vty_out(vty, "%5d %10ld.%03ld %9d %8ld %9ld %8ld %9ld", + a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls, a->cpu.total/a->total_calls, a->cpu.max, a->real.total/a->total_calls, a->real.max); vty_out(vty, " %c%c%c%c%c%c %s%s", @@ -247,6 +118,7 @@ cpu_record_hash_print(struct hash_backet *bucket, if ( !(a->types & *filter) ) return; vty_out_cpu_thread_history(vty,a); + totals->total_active += a->total_active; totals->total_calls += a->total_calls; totals->real.total += a->real.total; if (totals->real.max < a->real.max) @@ -268,12 +140,17 @@ cpu_record_print(struct vty *vty, thread_type filter) vty_out(vty, "%21s %18s %18s%s", "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE); - vty_out(vty, " Runtime(ms) Invoked Avg uSec Max uSecs"); + vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); vty_out(vty, " Avg uSec Max uSecs"); vty_out(vty, " Type Thread%s", VTY_NEWLINE); - hash_iterate(cpu_record, - (void(*)(struct hash_backet*,void*))cpu_record_hash_print, - args); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate(cpu_record, + (void(*)(struct hash_backet*,void*))cpu_record_hash_print, + args); + } + pthread_mutex_unlock (&cpu_record_mtx); if (tmp.total_calls > 0) vty_out_cpu_thread_history(vty, &tmp); @@ -287,15 +164,16 @@ DEFUN (show_thread_cpu, "Thread CPU usage\n" "Display filter (rwtexb)\n") { + int idx_filter = 3; int i = 0; thread_type filter = (thread_type) -1U; - if (argc > 0) + if (argc > 3) { filter = 0; - while (argv[0][i] != '\0') + while (argv[idx_filter]->arg[i] != '\0') { - switch ( argv[0][i] ) + switch ( argv[idx_filter]->arg[i] ) { case 'r': case 'R': @@ -330,7 +208,7 @@ DEFUN (show_thread_cpu, { vty_out(vty, "Invalid filter \"%s\" specified," " must contain at least one of 'RWTEXB'%s", - argv[0], VTY_NEWLINE); + argv[idx_filter]->arg, VTY_NEWLINE); return CMD_WARNING; } } @@ -349,16 +227,25 @@ cpu_record_hash_clear (struct hash_backet *bucket, if ( !(a->types & *filter) ) return; - hash_release (cpu_record, bucket->data); + pthread_mutex_lock (&cpu_record_mtx); + { + hash_release (cpu_record, bucket->data); + } + pthread_mutex_unlock (&cpu_record_mtx); } static void cpu_record_clear (thread_type filter) { thread_type *tmp = &filter; - hash_iterate (cpu_record, - (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, - tmp); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate (cpu_record, + (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, + tmp); + } + pthread_mutex_unlock (&cpu_record_mtx); } DEFUN (clear_thread_cpu, @@ -369,15 +256,16 @@ DEFUN (clear_thread_cpu, "Thread CPU usage\n" "Display filter (rwtexb)\n") { + int idx_filter = 3; int i = 0; thread_type filter = (thread_type) -1U; - if (argc > 0) + if (argc > 3) { filter = 0; - while (argv[0][i] != '\0') + while (argv[idx_filter]->arg[i] != '\0') { - switch ( argv[0][i] ) + switch ( argv[idx_filter]->arg[i] ) { case 'r': case 'R': @@ -412,7 +300,7 @@ DEFUN (clear_thread_cpu, { vty_out(vty, "Invalid filter \"%s\" specified," " must contain at least one of 'RWTEXB'%s", - argv[0], VTY_NEWLINE); + argv[idx_filter]->arg, VTY_NEWLINE); return CMD_WARNING; } } @@ -421,17 +309,22 @@ DEFUN (clear_thread_cpu, return CMD_SUCCESS; } +void +thread_cmd_init (void) +{ + install_element (VIEW_NODE, &show_thread_cpu_cmd); + install_element (ENABLE_NODE, &clear_thread_cpu_cmd); +} + static int thread_timer_cmp(void *a, void *b) { struct thread *thread_a = a; struct thread *thread_b = b; - long cmp = timeval_cmp(thread_a->u.sands, thread_b->u.sands); - - if (cmp < 0) + if (timercmp (&thread_a->u.sands, &thread_b->u.sands, <)) return -1; - if (cmp > 0) + if (timercmp (&thread_a->u.sands, &thread_b->u.sands, >)) return 1; return 0; } @@ -453,16 +346,20 @@ thread_master_create (void) getrlimit(RLIMIT_NOFILE, &limit); - if (cpu_record == NULL) - cpu_record - = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, - (int (*) (const void *, const void *))cpu_record_hash_cmp); + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record == NULL) + cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, + (int (*) (const void *, const void *)) + cpu_record_hash_cmp); + } + pthread_mutex_unlock (&cpu_record_mtx); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); if (rv == NULL) - { - return NULL; - } + return NULL; + + pthread_mutex_init (&rv->mtx, NULL); rv->fd_limit = (int)limit.rlim_cur; rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); @@ -485,13 +382,18 @@ thread_master_create (void) rv->background = pqueue_create(); rv->timer->cmp = rv->background->cmp = thread_timer_cmp; rv->timer->update = rv->background->update = thread_timer_update; + rv->spin = true; + rv->handle_signals = true; + rv->owner = pthread_self(); + pipe (rv->io_pipe); + set_nonblocking (rv->io_pipe[0]); + set_nonblocking (rv->io_pipe[1]); -#if defined(HAVE_POLL_CALL) rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct pollfd) * rv->handler.pfdsize); -#endif + return rv; } @@ -526,18 +428,6 @@ thread_list_delete (struct thread_list *list, struct thread *thread) return thread; } -static void -thread_delete_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = NULL; -} - -static void -thread_add_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = thread; -} - /* Thread list is empty or not. */ static int thread_empty (struct thread_list *list) @@ -561,7 +451,10 @@ thread_add_unuse (struct thread_master *m, struct thread *thread) assert (m != NULL && thread != NULL); assert (thread->next == NULL); assert (thread->prev == NULL); - assert (thread->type == THREAD_UNUSED); + thread->ref = NULL; + + thread->type = THREAD_UNUSED; + thread->hist->total_active--; thread_list_add (&m->unuse, thread); } @@ -623,11 +516,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue) void thread_master_free_unused (struct thread_master *m) { - struct thread *t; - while ((t = thread_trim_head(&m->unuse)) != NULL) - { - XFREE(MTYPE_THREAD, t); - } + pthread_mutex_lock (&m->mtx); + { + struct thread *t; + while ((t = thread_trim_head(&m->unuse)) != NULL) + { + pthread_mutex_destroy (&t->mtx); + XFREE(MTYPE_THREAD, t); + } + } + pthread_mutex_unlock (&m->mtx); } /* Stop thread scheduler. */ @@ -641,30 +539,38 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); + pthread_mutex_destroy (&m->mtx); + close (m->io_pipe[0]); + close (m->io_pipe[1]); -#if defined(HAVE_POLL_CALL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); -#endif XFREE (MTYPE_THREAD_MASTER, m); - if (cpu_record) - { - hash_clean (cpu_record, cpu_record_hash_free); - hash_free (cpu_record); - cpu_record = NULL; - } + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record) + { + hash_clean (cpu_record, cpu_record_hash_free); + hash_free (cpu_record); + cpu_record = NULL; + } + } + pthread_mutex_unlock (&cpu_record_mtx); } /* Return remain time in second. */ unsigned long thread_timer_remain_second (struct thread *thread) { - quagga_get_relative (NULL); - - if (thread->u.sands.tv_sec - relative_time.tv_sec > 0) - return thread->u.sands.tv_sec - relative_time.tv_sec; - else - return 0; + int64_t remain; + + pthread_mutex_lock (&thread->mtx); + { + remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + } + pthread_mutex_unlock (&thread->mtx); + + return remain < 0 ? 0 : remain; } #define debugargdef const char *funcname, const char *schedfrom, int fromln @@ -673,9 +579,13 @@ thread_timer_remain_second (struct thread *thread) struct timeval thread_timer_remain(struct thread *thread) { - quagga_get_relative(NULL); - - return timeval_subtract(thread->u.sands, relative_time); + struct timeval remain; + pthread_mutex_lock (&thread->mtx); + { + monotime_until(&thread->u.sands, &remain); + } + pthread_mutex_unlock (&thread->mtx); + return remain; } /* Get new thread. */ @@ -684,20 +594,48 @@ thread_get (struct thread_master *m, u_char type, int (*func) (struct thread *), void *arg, debugargdef) { struct thread *thread = thread_trim_head (&m->unuse); + struct cpu_thread_history tmp; if (! thread) { thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread)); + /* mutex only needs to be initialized at struct creation. */ + pthread_mutex_init (&thread->mtx, NULL); m->alloc++; } + thread->type = type; thread->add_type = type; thread->master = m; - thread->func = func; thread->arg = arg; thread->index = -1; thread->yield = THREAD_YIELD_TIME_SLOT; /* default */ - + thread->ref = NULL; + + /* + * So if the passed in funcname is not what we have + * stored that means the thread->hist needs to be + * updated. We keep the last one around in unused + * under the assumption that we are probably + * going to immediately allocate the same + * type of thread. + * This hopefully saves us some serious + * hash_get lookups. + */ + if (thread->funcname != funcname || + thread->func != func) + { + tmp.func = func; + tmp.funcname = funcname; + pthread_mutex_lock (&cpu_record_mtx); + { + thread->hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); + } + thread->hist->total_active++; + thread->func = func; thread->funcname = funcname; thread->schedfrom = schedfrom; thread->schedfrom_line = fromln; @@ -705,173 +643,151 @@ thread_get (struct thread_master *m, u_char type, return thread; } -#if defined (HAVE_POLL_CALL) - -#define fd_copy_fd_set(X) (X) - -/* generic add thread function */ -static struct thread * -generic_thread_add(struct thread_master *m, int (*func) (struct thread *), - void *arg, int fd, int dir, debugargdef) +static int +fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, + nfds_t count, struct timeval *timer_wait) { - struct thread *thread; - - u_char type; - short int event; - - if (dir == THREAD_READ) - { - event = (POLLIN | POLLHUP); - type = THREAD_READ; - } - else - { - event = (POLLOUT | POLLHUP); - type = THREAD_WRITE; - } + if (count == 0) + return 0; - nfds_t queuepos = m->handler.pfdcount; - nfds_t i=0; - for (i=0; i<m->handler.pfdcount; i++) - if (m->handler.pfds[i].fd == fd) - { - queuepos = i; - break; - } + /* If timer_wait is null here, that means poll() should block indefinitely, + * unless the thread_master has overriden it by setting ->selectpoll_timeout. + * If the value is positive, it specifies the maximum number of milliseconds + * to wait. If the timeout is -1, it specifies that we should never wait and + * always return immediately even if no event is detected. If the value is + * zero, the behavior is default. */ + int timeout = -1; - /* is there enough space for a new fd? */ - assert (queuepos < m->handler.pfdsize); + /* number of file descriptors with events */ + int num; - thread = thread_get (m, type, func, arg, debugargpass); - m->handler.pfds[queuepos].fd = fd; - m->handler.pfds[queuepos].events |= event; - if (queuepos == m->handler.pfdcount) - m->handler.pfdcount++; + if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value + timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); + else if (m->selectpoll_timeout > 0) // use the user's timeout + timeout = m->selectpoll_timeout; + else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) + timeout = 0; - return thread; -} -#else + /* add poll pipe poker */ + assert (count + 1 < pfdsize); + pfds[count].fd = m->io_pipe[0]; + pfds[count].events = POLLIN; + pfds[count].revents = 0x00; -#define fd_copy_fd_set(X) (X) -#endif + num = poll (pfds, count + 1, timeout); -static int -fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) -{ - int num; -#if defined(HAVE_POLL_CALL) - /* recalc timeout for poll. Attention NULL pointer is no timeout with - select, where with poll no timeount is -1 */ - int timeout = -1; - if (timer_wait != NULL) - timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); - - num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); -#else - num = select (size, read, write, except, timer_wait); -#endif + static unsigned char trash[64]; + if (num > 0 && pfds[count].revents != 0 && num--) + while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0); return num; } -static int -fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos) +/* Add new read thread. */ +struct thread * +funcname_thread_add_read_write (int dir, struct thread_master *m, + int (*func) (struct thread *), void *arg, int fd, struct thread **t_ptr, + debugargdef) { -#if defined(HAVE_POLL_CALL) - return 1; -#else - return FD_ISSET (THREAD_FD (thread), fdset); -#endif -} + struct thread *thread = NULL; -static int -fd_clear_read_write (struct thread *thread) -{ -#if !defined(HAVE_POLL_CALL) - thread_fd_set *fdset = NULL; - int fd = THREAD_FD (thread); + pthread_mutex_lock (&m->mtx); + { + if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule + { + pthread_mutex_unlock (&m->mtx); + return NULL; + } - if (thread->type == THREAD_READ) - fdset = &thread->master->handler.readfd; - else - fdset = &thread->master->handler.writefd; + /* default to a new pollfd */ + nfds_t queuepos = m->handler.pfdcount; - if (!FD_ISSET (fd, fdset)) - return 0; + /* if we already have a pollfd for our file descriptor, find and use it */ + for (nfds_t i = 0; i < m->handler.pfdcount; i++) + if (m->handler.pfds[i].fd == fd) + { + queuepos = i; + break; + } - FD_CLR (fd, fdset); -#endif - return 1; -} + /* make sure we have room for this fd + pipe poker fd */ + assert (queuepos + 1 < m->handler.pfdsize); -/* Add new read thread. */ -struct thread * -funcname_thread_add_read_write (int dir, struct thread_master *m, - int (*func) (struct thread *), void *arg, int fd, - debugargdef) -{ - struct thread *thread = NULL; + thread = thread_get (m, dir, func, arg, debugargpass); -#if !defined(HAVE_POLL_CALL) - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; -#endif + m->handler.pfds[queuepos].fd = fd; + m->handler.pfds[queuepos].events |= (dir == THREAD_READ ? POLLIN : POLLOUT); -#if defined (HAVE_POLL_CALL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); + if (queuepos == m->handler.pfdcount) + m->handler.pfdcount++; - if (thread == NULL) - return NULL; -#else - if (FD_ISSET (fd, fdset)) - { - zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir == THREAD_READ) ? "read" : "write", fd); - return NULL; - } + if (thread) + { + pthread_mutex_lock (&thread->mtx); + { + thread->u.fd = fd; + if (dir == THREAD_READ) + m->read[thread->u.fd] = thread; + else + m->write[thread->u.fd] = thread; + } + pthread_mutex_unlock (&thread->mtx); - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); -#endif + if (t_ptr) + { + *t_ptr = thread; + thread->ref = t_ptr; + } + } - thread->u.fd = fd; - if (dir == THREAD_READ) - thread_add_fd (m->read, thread); - else - thread_add_fd (m->write, thread); + AWAKEN (m); + } + pthread_mutex_unlock (&m->mtx); return thread; } static struct thread * funcname_thread_add_timer_timeval (struct thread_master *m, - int (*func) (struct thread *), - int type, - void *arg, - struct timeval *time_relative, - debugargdef) + int (*func) (struct thread *), int type, void *arg, + struct timeval *time_relative, struct thread **t_ptr, debugargdef) { struct thread *thread; struct pqueue *queue; - struct timeval alarm_time; assert (m != NULL); assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); assert (time_relative); - queue = ((type == THREAD_TIMER) ? m->timer : m->background); - thread = thread_get (m, type, func, arg, debugargpass); + pthread_mutex_lock (&m->mtx); + { + if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule + { + pthread_mutex_unlock (&m->mtx); + return NULL; + } - /* Do we need jitter here? */ - quagga_get_relative (NULL); - alarm_time.tv_sec = relative_time.tv_sec + time_relative->tv_sec; - alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec; - thread->u.sands = timeval_adjust(alarm_time); + queue = ((type == THREAD_TIMER) ? m->timer : m->background); + thread = thread_get (m, type, func, arg, debugargpass); + + pthread_mutex_lock (&thread->mtx); + { + monotime(&thread->u.sands); + timeradd(&thread->u.sands, time_relative, &thread->u.sands); + pqueue_enqueue(thread, queue); + if (t_ptr) + { + *t_ptr = thread; + thread->ref = t_ptr; + } + } + pthread_mutex_unlock (&thread->mtx); + + AWAKEN (m); + } + pthread_mutex_unlock (&m->mtx); - pqueue_enqueue(thread, queue); return thread; } @@ -879,9 +795,8 @@ funcname_thread_add_timer_timeval (struct thread_master *m, /* Add timer event thread. */ struct thread * funcname_thread_add_timer (struct thread_master *m, - int (*func) (struct thread *), - void *arg, long timer, - debugargdef) + int (*func) (struct thread *), void *arg, long timer, + struct thread **t_ptr, debugargdef) { struct timeval trel; @@ -890,16 +805,15 @@ funcname_thread_add_timer (struct thread_master *m, trel.tv_sec = timer; trel.tv_usec = 0; - return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, - &trel, debugargpass); + return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel, + t_ptr, debugargpass); } /* Add timer event thread with "millisecond" resolution */ struct thread * funcname_thread_add_timer_msec (struct thread_master *m, - int (*func) (struct thread *), - void *arg, long timer, - debugargdef) + int (*func) (struct thread *), void *arg, long timer, + struct thread **t_ptr, debugargdef) { struct timeval trel; @@ -908,27 +822,25 @@ funcname_thread_add_timer_msec (struct thread_master *m, trel.tv_sec = timer / 1000; trel.tv_usec = 1000*(timer % 1000); - return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, - arg, &trel, debugargpass); + return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel, + t_ptr, debugargpass); } /* Add timer event thread with "millisecond" resolution */ struct thread * funcname_thread_add_timer_tv (struct thread_master *m, - int (*func) (struct thread *), - void *arg, struct timeval *tv, - debugargdef) + int (*func) (struct thread *), void *arg, struct timeval *tv, + struct thread **t_ptr, debugargdef) { - return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, - arg, tv, debugargpass); + return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, tv, + t_ptr, debugargpass); } /* Add a background thread, with an optional millisec delay */ struct thread * funcname_thread_add_background (struct thread_master *m, - int (*func) (struct thread *), - void *arg, long delay, - debugargdef) + int (*func) (struct thread *), void *arg, long delay, + struct thread **t_ptr, debugargdef) { struct timeval trel; @@ -945,23 +857,45 @@ funcname_thread_add_background (struct thread_master *m, trel.tv_usec = 0; } - return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, - arg, &trel, debugargpass); + return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, arg, &trel, + t_ptr, debugargpass); } /* Add simple event thread. */ struct thread * funcname_thread_add_event (struct thread_master *m, - int (*func) (struct thread *), void *arg, int val, - debugargdef) + int (*func) (struct thread *), void *arg, int val, + struct thread **t_ptr, debugargdef) { struct thread *thread; assert (m != NULL); - thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); - thread->u.val = val; - thread_list_add (&m->event, thread); + pthread_mutex_lock (&m->mtx); + { + if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule + { + pthread_mutex_unlock (&m->mtx); + return NULL; + } + + thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); + pthread_mutex_lock (&thread->mtx); + { + thread->u.val = val; + thread_list_add (&m->event, thread); + } + pthread_mutex_unlock (&thread->mtx); + + if (t_ptr) + { + *t_ptr = thread; + thread->ref = t_ptr; + } + + AWAKEN (m); + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -969,10 +903,7 @@ funcname_thread_add_event (struct thread_master *m, static void thread_cancel_read_or_write (struct thread *thread, short int state) { -#if defined(HAVE_POLL_CALL) - nfds_t i; - - for (i=0;i<thread->master->handler.pfdcount;++i) + for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) if (thread->master->handler.pfds[i].fd == thread->u.fd) { thread->master->handler.pfds[i].events &= ~(state); @@ -987,35 +918,34 @@ thread_cancel_read_or_write (struct thread *thread, short int state) return; } } -#endif - - fd_clear_read_write (thread); } -/* Cancel thread from scheduler. */ +/** + * Cancel thread from scheduler. + * + * This function is *NOT* MT-safe. DO NOT call it from any other pthread except + * the one which owns thread->master. You will crash. + */ void thread_cancel (struct thread *thread) { struct thread_list *list = NULL; struct pqueue *queue = NULL; struct thread **thread_array = NULL; - + + pthread_mutex_lock (&thread->mtx); + pthread_mutex_lock (&thread->master->mtx); + + assert (pthread_self() == thread->master->owner); + switch (thread->type) { case THREAD_READ: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLIN | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->read; break; case THREAD_WRITE: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->write; break; case THREAD_TIMER: @@ -1031,15 +961,14 @@ thread_cancel (struct thread *thread) queue = thread->master->background; break; default: - return; + goto done; break; } if (queue) { assert(thread->index >= 0); - assert(thread == queue->array[thread->index]); - pqueue_remove_at(thread->index, queue); + pqueue_remove (thread, queue); } else if (list) { @@ -1047,15 +976,21 @@ thread_cancel (struct thread *thread) } else if (thread_array) { - thread_delete_fd (thread_array, thread); + thread_array[thread->u.fd] = NULL; } else { assert(!"Thread should be either in queue or list or array!"); } - thread->type = THREAD_UNUSED; + if (thread->ref) + *thread->ref = NULL; + thread_add_unuse (thread->master, thread); + +done: + pthread_mutex_unlock (&thread->master->mtx); + pthread_mutex_unlock (&thread->mtx); } /* Delete all events which has argument value arg. */ @@ -1064,41 +999,52 @@ thread_cancel_event (struct thread_master *m, void *arg) { unsigned int ret = 0; struct thread *thread; + struct thread *t; - thread = m->event.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; - - if (t->arg == arg) + pthread_mutex_lock (&m->mtx); + { + thread = m->event.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->event, t); - t->type = THREAD_UNUSED; - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->event, t); + if (t->ref) + *t->ref = NULL; + thread_add_unuse (m, t); + } } - } - - /* thread can be on the ready list too */ - thread = m->ready.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; + pthread_mutex_unlock (&t->mtx); + } - if (t->arg == arg) + /* thread can be on the ready list too */ + thread = m->ready.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->ready, t); - t->type = THREAD_UNUSED; - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->ready, t); + if (t->ref) + *t->ref = NULL; + thread_add_unuse (m, t); + } } - } + pthread_mutex_unlock (&t->mtx); + } + } + pthread_mutex_unlock (&m->mtx); return ret; } @@ -1108,7 +1054,7 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) if (queue->size) { struct thread *next_timer = queue->array[0]; - *timer_val = timeval_subtract (next_timer->u.sands, relative_time); + monotime_until(&next_timer->u.sands, timer_val); return timer_val; } return NULL; @@ -1116,16 +1062,16 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) static struct thread * thread_run (struct thread_master *m, struct thread *thread, - struct thread *fetch) + struct thread *fetch) { *fetch = *thread; - thread->type = THREAD_UNUSED; thread_add_unuse (m, thread); return fetch; } static int -thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos) +thread_process_io_helper (struct thread_master *m, struct thread *thread, + short state, int pos) { struct thread **thread_array; @@ -1137,77 +1083,60 @@ thread_process_fds_helper (struct thread_master *m, struct thread *thread, threa else thread_array = m->write; - if (fd_is_set (thread, fdset, pos)) - { - fd_clear_read_write (thread); - thread_delete_fd (thread_array, thread); - thread_list_add (&m->ready, thread); - thread->type = THREAD_READY; -#if defined(HAVE_POLL_CALL) - thread->master->handler.pfds[pos].events &= ~(state); -#endif - return 1; - } - return 0; + thread_array[thread->u.fd] = NULL; + thread_list_add (&m->ready, thread); + thread->type = THREAD_READY; + /* if another pthread scheduled this file descriptor for the event we're + * responding to, no problem; we're getting to it now */ + thread->master->handler.pfds[pos].events &= ~(state); + return 1; } -#if defined(HAVE_POLL_CALL) - -/* check poll events */ static void -check_pollfds(struct thread_master *m, fd_set *readfd, int num) +thread_process_io (struct thread_master *m, struct pollfd *pfds, + unsigned int num, unsigned int count) { - nfds_t i = 0; - int ready = 0; - for (i = 0; i < m->handler.pfdcount && ready < num ; ++i) + unsigned int ready = 0; + + for (nfds_t i = 0; i < count && ready < num ; ++i) { - /* no event for current fd? immideatly continue */ - if(m->handler.pfds[i].revents == 0) + /* no event for current fd? immediately continue */ + if (pfds[i].revents == 0) continue; ready++; - /* POLLIN / POLLOUT process event */ - if (m->handler.pfds[i].revents & POLLIN) - thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); - if (m->handler.pfds[i].revents & POLLOUT) - thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); - - /* remove fd from list on POLLNVAL */ - if (m->handler.pfds[i].revents & POLLNVAL || - m->handler.pfds[i].revents & POLLHUP) + /* Unless someone has called thread_cancel from another pthread, the only + * thing that could have changed in m->handler.pfds while we were + * asleep is the .events field in a given pollfd. Barring thread_cancel() + * that value should be a superset of the values we have in our copy, so + * there's no need to update it. Similarily, barring deletion, the fd + * should still be a valid index into the master's pfds. */ + if (pfds[i].revents & (POLLIN | POLLHUP)) + thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, i); + if (pfds[i].revents & POLLOUT) + thread_process_io_helper(m, m->write[pfds[i].fd], POLLOUT, i); + + /* if one of our file descriptors is garbage, remove the same from + * both pfds + update sizes and index */ + if (pfds[i].revents & POLLNVAL) { - memmove(m->handler.pfds+i, - m->handler.pfds+i+1, - (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); - m->handler.pfdcount--; - i--; + memmove (m->handler.pfds + i, + m->handler.pfds + i + 1, + (m->handler.pfdcount - i - 1) * sizeof(struct pollfd)); + m->handler.pfdcount--; + + memmove (pfds + i, pfds + i + 1, + (count - i - 1) * sizeof(struct pollfd)); + count--; + i--; } - else - m->handler.pfds[i].revents = 0; } } -#endif - -static void -thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num) -{ -#if defined (HAVE_POLL_CALL) - check_pollfds (m, rset, num); -#else - int ready = 0, index; - - for (index = 0; index < m->fd_limit && ready < num; ++index) - { - ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0); - ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0); - } -#endif -} /* Add all timers that have popped to the ready list. */ static unsigned int -thread_timer_process (struct pqueue *queue, struct timeval *timenow) +thread_process_timers (struct pqueue *queue, struct timeval *timenow) { struct thread *thread; unsigned int ready = 0; @@ -1215,7 +1144,7 @@ thread_timer_process (struct pqueue *queue, struct timeval *timenow) while (queue->size) { thread = queue->array[0]; - if (timeval_cmp (*timenow, thread->u.sands) < 0) + if (timercmp (timenow, &thread->u.sands, <)) return ready; pqueue_dequeue(queue); thread->type = THREAD_READY; @@ -1250,26 +1179,32 @@ struct thread * thread_fetch (struct thread_master *m, struct thread *fetch) { struct thread *thread; - thread_fd_set readfd; - thread_fd_set writefd; - thread_fd_set exceptfd; + struct timeval now; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; struct timeval timer_val_bg; struct timeval *timer_wait = &timer_val; struct timeval *timer_wait_bg; - while (1) + do { int num = 0; /* Signals pre-empt everything */ - quagga_sigevent_process (); + if (m->handle_signals) + quagga_sigevent_process (); + pthread_mutex_lock (&m->mtx); /* Drain the ready queue of already scheduled jobs, before scheduling * more. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + if (fetch->ref) + *fetch->ref = NULL; + pthread_mutex_unlock (&m->mtx); + return fetch; + } /* To be fair to all kinds of threads, and avoid starvation, we * need to be careful to consider all thread types for scheduling @@ -1279,45 +1214,56 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* Normal event are the next highest priority. */ thread_process (&m->event); - /* Structure copy. */ -#if !defined(HAVE_POLL_CALL) - readfd = fd_copy_fd_set(m->handler.readfd); - writefd = fd_copy_fd_set(m->handler.writefd); - exceptfd = fd_copy_fd_set(m->handler.exceptfd); -#endif - /* Calculate select wait timer if nothing else to do */ if (m->ready.count == 0) { - quagga_get_relative (NULL); timer_wait = thread_timer_wait (m->timer, &timer_val); timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg); if (timer_wait_bg && - (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0))) + (!timer_wait || (timercmp (timer_wait, timer_wait_bg, >)))) timer_wait = timer_wait_bg; } - num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); + if (timer_wait && timer_wait->tv_sec < 0) + { + timerclear(&timer_val); + timer_wait = &timer_val; + } + + /* copy pollfds so we can unlock during blocking calls to poll() */ + struct pollfd pfds[m->handler.pfdsize]; + unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; + memcpy (pfds, m->handler.pfds, count * sizeof (struct pollfd)); + + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); + } + pthread_mutex_lock (&m->mtx); /* Signals should get quick treatment */ if (num < 0) { if (errno == EINTR) - continue; /* signal received - process it */ - zlog_warn ("select() error: %s", safe_strerror (errno)); + { + pthread_mutex_unlock (&m->mtx); + continue; /* signal received - process it */ + } + zlog_warn ("poll() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); return NULL; } /* Check foreground timers. Historically, they have had higher - priority than I/O threads, so let's push them onto the ready - list in front of the I/O threads. */ - quagga_get_relative (NULL); - thread_timer_process (m->timer, &relative_time); + * priority than I/O threads, so let's push them onto the ready + * list in front of the I/O threads. */ + monotime(&now); + thread_process_timers (m->timer, &now); /* Got IO, process it */ if (num > 0) - thread_process_fds (m, &readfd, &writefd, num); + thread_process_io (m, pfds, num, count); #if 0 /* If any threads were made ready above (I/O or foreground timer), @@ -1325,15 +1271,32 @@ thread_fetch (struct thread_master *m, struct thread *fetch) list at this time. If this is code is uncommented, then background timer threads will not run unless there is nothing else to do. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + if (fetch->ref) + *fetch->ref = NULL; + pthread_mutex_unlock (&m->mtx); + return fetch; + } #endif /* Background timer/events, lowest priority */ - thread_timer_process (m->background, &relative_time); + thread_process_timers (m->background, &now); if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); - } + { + fetch = thread_run (m, thread, fetch); + if (fetch->ref) + *fetch->ref = NULL; + pthread_mutex_unlock (&m->mtx); + return fetch; + } + + pthread_mutex_unlock (&m->mtx); + + } while (m->spin); + + return NULL; } unsigned long @@ -1358,31 +1321,30 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime) int thread_should_yield (struct thread *thread) { - quagga_get_relative (NULL); - return (timeval_elapsed(relative_time, thread->real) > - thread->yield); + int result; + pthread_mutex_lock (&thread->mtx); + { + result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + } + pthread_mutex_unlock (&thread->mtx); + return result; } void thread_set_yield_time (struct thread *thread, unsigned long yield_time) { - thread->yield = yield_time; + pthread_mutex_lock (&thread->mtx); + { + thread->yield = yield_time; + } + pthread_mutex_unlock (&thread->mtx); } void thread_getrusage (RUSAGE_T *r) { - quagga_get_relative (NULL); + monotime(&r->real); getrusage(RUSAGE_SELF, &(r->cpu)); - r->real = relative_time; - -#ifdef HAVE_CLOCK_MONOTONIC - /* quagga_get_relative() only updates recent_time if gettimeofday - * based, not when using CLOCK_MONOTONIC. As we export recent_time - * and guarantee to update it before threads are run... - */ - quagga_gettimeofday(&recent_time); -#endif /* HAVE_CLOCK_MONOTONIC */ } struct thread *thread_current = NULL; @@ -1396,23 +1358,6 @@ thread_call (struct thread *thread) unsigned long realtime, cputime; RUSAGE_T before, after; - /* Cache a pointer to the relevant cpu history thread, if the thread - * does not have it yet. - * - * Callers submitting 'dummy threads' hence must take care that - * thread->cpu is NULL - */ - if (!thread->hist) - { - struct cpu_thread_history tmp; - - tmp.func = thread->func; - tmp.funcname = thread->funcname; - - thread->hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); - } - GETRUSAGE (&before); thread->real = before.real; @@ -1450,29 +1395,36 @@ thread_call (struct thread *thread) } /* Execute thread */ -struct thread * +void funcname_thread_execute (struct thread_master *m, int (*func)(struct thread *), void *arg, int val, debugargdef) { - struct thread dummy; + struct cpu_thread_history tmp; + struct thread dummy; memset (&dummy, 0, sizeof (struct thread)); + pthread_mutex_init (&dummy.mtx, NULL); dummy.type = THREAD_EVENT; dummy.add_type = THREAD_EXECUTE; dummy.master = NULL; - dummy.func = func; dummy.arg = arg; dummy.u.val = val; - dummy.funcname = funcname; + tmp.func = dummy.func = func; + tmp.funcname = dummy.funcname = funcname; + pthread_mutex_lock (&cpu_record_mtx); + { + dummy.hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); + dummy.schedfrom = schedfrom; dummy.schedfrom_line = fromln; thread_call (&dummy); - - return NULL; } |
