diff options
| author | Daniel Walton <dwalton@cumulusnetworks.com> | 2017-06-30 17:52:56 +0000 |
|---|---|---|
| committer | Daniel Walton <dwalton@cumulusnetworks.com> | 2017-06-30 17:52:56 +0000 |
| commit | 1161690b93b48fbd07f4ee25c1261574db8d71c5 (patch) | |
| tree | 7ffbe5c3b333b1fe0b8a3f042d8b1af602d48019 /lib/thread.c | |
| parent | ab782c96f881b1fdd59f52ba972cd82b5eeadc66 (diff) | |
| parent | 5fca4e3635c2778e8349bce0eaf944c26913d321 (diff) | |
Merge branch 'master' of https://github.com/dwalton76/frr into bgpd-ipv4-plus-label-misc3
Conflicts:
bgpd/bgp_route.c
Diffstat (limited to 'lib/thread.c')
| -rw-r--r-- | lib/thread.c | 934 |
1 files changed, 561 insertions, 373 deletions
diff --git a/lib/thread.c b/lib/thread.c index bf3500fd8b..801168a799 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -47,16 +47,15 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") write (m->io_pipe[1], &wakebyte, 1); \ } while (0); -static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; -static struct hash *cpu_record = NULL; +/* control variable for initializer */ +pthread_once_t init_once = PTHREAD_ONCE_INIT; +pthread_key_t thread_current; + +pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER; +static struct list *masters; -static unsigned long -timeval_elapsed (struct timeval a, struct timeval b) -{ - return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO) - + (a.tv_usec - b.tv_usec)); -} +/* CLI start ---------------------------------------------------------------- */ static unsigned int cpu_record_hash_key (struct cpu_thread_history *a) { @@ -96,22 +95,22 @@ vty_out_cpu_thread_history(struct vty* vty, a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls, a->cpu.total/a->total_calls, a->cpu.max, a->real.total/a->total_calls, a->real.max); - vty_out(vty, " %c%c%c%c%c %s%s", + vty_outln (vty, " %c%c%c%c%c %s", a->types & (1 << THREAD_READ) ? 'R':' ', a->types & (1 << THREAD_WRITE) ? 'W':' ', a->types & (1 << THREAD_TIMER) ? 'T':' ', a->types & (1 << THREAD_EVENT) ? 'E':' ', a->types & (1 << THREAD_EXECUTE) ? 'X':' ', - a->funcname, VTY_NEWLINE); + a->funcname); } static void -cpu_record_hash_print(struct hash_backet *bucket, - void *args[]) +cpu_record_hash_print(struct hash_backet *bucket, void *args[]) { struct cpu_thread_history *totals = args[0]; struct vty *vty = args[1]; thread_type *filter = args[2]; + struct cpu_thread_history *a = bucket->data; if ( !(a->types & *filter) ) @@ -132,169 +131,174 @@ cpu_record_print(struct vty *vty, thread_type filter) { struct cpu_thread_history tmp; void *args[3] = {&tmp, vty, &filter}; + struct thread_master *m; + struct listnode *ln; memset(&tmp, 0, sizeof tmp); tmp.funcname = "TOTAL"; tmp.types = filter; - vty_out(vty, "%21s %18s %18s%s", - "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE); - vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); - vty_out(vty, " Avg uSec Max uSecs"); - vty_out(vty, " Type Thread%s", VTY_NEWLINE); - - pthread_mutex_lock (&cpu_record_mtx); + pthread_mutex_lock (&masters_mtx); { - hash_iterate(cpu_record, - (void(*)(struct hash_backet*,void*))cpu_record_hash_print, - args); + for (ALL_LIST_ELEMENTS_RO (masters, ln, m)) { + const char *name = m->name ? m->name : "main"; + + char underline[strlen(name) + 1]; + memset (underline, '-', sizeof (underline)); + underline[sizeof(underline)] = '\0'; + + vty_out (vty, VTYNL); + vty_outln(vty, "Showing statistics for pthread %s", name); + vty_outln(vty, "-------------------------------%s", underline); + vty_outln(vty, "%21s %18s %18s", "", "CPU (user+system):", "Real (wall-clock):"); + vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); + vty_out(vty, " Avg uSec Max uSecs"); + vty_outln(vty, " Type Thread"); + + if (m->cpu_record->count) + hash_iterate(m->cpu_record, + (void (*)(struct hash_backet *, void *)) + cpu_record_hash_print, + args); + else + vty_outln(vty, "No data to display yet."); + + vty_out(vty, VTYNL); + } } - pthread_mutex_unlock (&cpu_record_mtx); + pthread_mutex_unlock (&masters_mtx); + + vty_out(vty, VTYNL); + vty_outln(vty, "Total thread statistics"); + vty_outln(vty, "-------------------------"); + vty_outln(vty, "%21s %18s %18s", "", "CPU (user+system):", "Real (wall-clock):"); + vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); + vty_out(vty, " Avg uSec Max uSecs"); + vty_outln(vty, " Type Thread"); if (tmp.total_calls > 0) vty_out_cpu_thread_history(vty, &tmp); } -DEFUN (show_thread_cpu, - show_thread_cpu_cmd, - "show thread cpu [FILTER]", - SHOW_STR - "Thread information\n" - "Thread CPU usage\n" - "Display filter (rwtexb)\n") -{ - int idx_filter = 3; - int i = 0; - thread_type filter = (thread_type) -1U; - - if (argc > 3) - { - filter = 0; - while (argv[idx_filter]->arg[i] != '\0') - { - switch ( argv[idx_filter]->arg[i] ) - { - case 'r': - case 'R': - filter |= (1 << THREAD_READ); - break; - case 'w': - case 'W': - filter |= (1 << THREAD_WRITE); - break; - case 't': - case 'T': - filter |= (1 << THREAD_TIMER); - break; - case 'e': - case 'E': - filter |= (1 << THREAD_EVENT); - break; - case 'x': - case 'X': - filter |= (1 << THREAD_EXECUTE); - break; - default: - break; - } - ++i; - } - if (filter == 0) - { - vty_out(vty, "Invalid filter \"%s\" specified," - " must contain at least one of 'RWTEXB'%s", - argv[idx_filter]->arg, VTY_NEWLINE); - return CMD_WARNING; - } - } - - cpu_record_print(vty, filter); - return CMD_SUCCESS; -} - static void -cpu_record_hash_clear (struct hash_backet *bucket, - void *args) +cpu_record_hash_clear (struct hash_backet *bucket, void *args[]) { - thread_type *filter = args; + thread_type *filter = args[0]; + struct hash *cpu_record = args[1]; + struct cpu_thread_history *a = bucket->data; if ( !(a->types & *filter) ) return; - pthread_mutex_lock (&cpu_record_mtx); - { - hash_release (cpu_record, bucket->data); - } - pthread_mutex_unlock (&cpu_record_mtx); + hash_release (cpu_record, bucket->data); } static void cpu_record_clear (thread_type filter) { thread_type *tmp = &filter; + struct thread_master *m; + struct listnode *ln; - pthread_mutex_lock (&cpu_record_mtx); + pthread_mutex_lock (&masters_mtx); { - hash_iterate (cpu_record, - (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, - tmp); + for (ALL_LIST_ELEMENTS_RO (masters, ln, m)) { + pthread_mutex_lock (&m->mtx); + { + void *args[2] = { tmp, m->cpu_record }; + hash_iterate (m->cpu_record, + (void (*) (struct hash_backet*,void*)) + cpu_record_hash_clear, + args); + } + pthread_mutex_unlock (&m->mtx); + } } - pthread_mutex_unlock (&cpu_record_mtx); + pthread_mutex_unlock (&masters_mtx); +} + +static thread_type +parse_filter (const char *filterstr) +{ + int i = 0; + int filter = 0; + + while (filterstr[i] != '\0') + { + switch (filterstr[i]) + { + case 'r': + case 'R': + filter |= (1 << THREAD_READ); + break; + case 'w': + case 'W': + filter |= (1 << THREAD_WRITE); + break; + case 't': + case 'T': + filter |= (1 << THREAD_TIMER); + break; + case 'e': + case 'E': + filter |= (1 << THREAD_EVENT); + break; + case 'x': + case 'X': + filter |= (1 << THREAD_EXECUTE); + break; + default: + break; + } + ++i; + } + return filter; +} + +DEFUN (show_thread_cpu, + show_thread_cpu_cmd, + "show thread cpu [FILTER]", + SHOW_STR + "Thread information\n" + "Thread CPU usage\n" + "Display filter (rwtexb)\n") +{ + thread_type filter = (thread_type) -1U; + int idx = 0; + + if (argv_find (argv, argc, "FILTER", &idx)) { + filter = parse_filter (argv[idx]->arg); + if (!filter) { + vty_outln(vty, "Invalid filter \"%s\" specified; must contain at least" + "one of 'RWTEXB'%s", argv[idx]->arg); + return CMD_WARNING; + } + } + + cpu_record_print(vty, filter); + return CMD_SUCCESS; } DEFUN (clear_thread_cpu, clear_thread_cpu_cmd, "clear thread cpu [FILTER]", - "Clear stored data\n" + "Clear stored data in all pthreads\n" "Thread information\n" "Thread CPU usage\n" "Display filter (rwtexb)\n") { - int idx_filter = 3; - int i = 0; thread_type filter = (thread_type) -1U; - - if (argc > 3) - { - filter = 0; - while (argv[idx_filter]->arg[i] != '\0') - { - switch ( argv[idx_filter]->arg[i] ) - { - case 'r': - case 'R': - filter |= (1 << THREAD_READ); - break; - case 'w': - case 'W': - filter |= (1 << THREAD_WRITE); - break; - case 't': - case 'T': - filter |= (1 << THREAD_TIMER); - break; - case 'e': - case 'E': - filter |= (1 << THREAD_EVENT); - break; - case 'x': - case 'X': - filter |= (1 << THREAD_EXECUTE); - break; - default: - break; - } - ++i; - } - if (filter == 0) - { - vty_out(vty, "Invalid filter \"%s\" specified," - " must contain at least one of 'RWTEXB'%s", - argv[idx_filter]->arg, VTY_NEWLINE); - return CMD_WARNING; - } + int idx = 0; + + if (argv_find (argv, argc, "FILTER", &idx)) { + filter = parse_filter (argv[idx]->arg); + if (!filter) { + vty_outln(vty, "Invalid filter \"%s\" specified; must contain at least" + "one of 'RWTEXB'%s", argv[idx]->arg); + return CMD_WARNING; } + } cpu_record_clear (filter); return CMD_SUCCESS; @@ -306,6 +310,8 @@ thread_cmd_init (void) install_element (VIEW_NODE, &show_thread_cpu_cmd); install_element (ENABLE_NODE, &clear_thread_cpu_cmd); } +/* CLI end ------------------------------------------------------------------ */ + static int thread_timer_cmp(void *a, void *b) @@ -328,30 +334,43 @@ thread_timer_update(void *node, int actual_position) thread->index = actual_position; } +static void +cancelreq_del (void *cr) +{ + XFREE (MTYPE_TMP, cr); +} + +/* initializer, only ever called once */ +static void initializer () +{ + if (!masters) + masters = list_new(); + + pthread_key_create (&thread_current, NULL); +} + /* Allocate new thread master. */ struct thread_master * -thread_master_create (void) +thread_master_create (const char *name) { struct thread_master *rv; struct rlimit limit; - getrlimit(RLIMIT_NOFILE, &limit); - - pthread_mutex_lock (&cpu_record_mtx); - { - if (cpu_record == NULL) - cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, - (int (*) (const void *, const void *)) - cpu_record_hash_cmp); - } - pthread_mutex_unlock (&cpu_record_mtx); + pthread_once (&init_once, &initializer); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); if (rv == NULL) return NULL; + /* Initialize master mutex */ pthread_mutex_init (&rv->mtx, NULL); + pthread_cond_init (&rv->cancel_cond, NULL); + + /* Set name */ + rv->name = name ? XSTRDUP (MTYPE_THREAD_MASTER, name) : NULL; + /* Initialize I/O task data structures */ + getrlimit(RLIMIT_NOFILE, &limit); rv->fd_limit = (int)limit.rlim_cur; rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); if (rv->read == NULL) @@ -359,7 +378,6 @@ thread_master_create (void) XFREE (MTYPE_THREAD_MASTER, rv); return NULL; } - rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); if (rv->write == NULL) { @@ -368,17 +386,32 @@ thread_master_create (void) return NULL; } + rv->cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, + (int (*) (const void *, const void *)) + cpu_record_hash_cmp); + + /* Initialize the timer queues */ rv->timer = pqueue_create(); rv->timer->cmp = thread_timer_cmp; rv->timer->update = thread_timer_update; + + /* Initialize thread_fetch() settings */ rv->spin = true; rv->handle_signals = true; + + /* Set pthread owner, should be updated by actual owner */ rv->owner = pthread_self(); + rv->cancel_req = list_new (); + rv->cancel_req->del = cancelreq_del; + rv->canceled = true; + + /* Initialize pipe poker */ pipe (rv->io_pipe); set_nonblocking (rv->io_pipe[0]); set_nonblocking (rv->io_pipe[1]); + /* Initialize data structures for poll() */ rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER, @@ -386,6 +419,13 @@ thread_master_create (void) rv->handler.copy = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct pollfd) * rv->handler.pfdsize); + /* add to list */ + pthread_mutex_lock (&masters_mtx); + { + listnode_add (masters, rv); + } + pthread_mutex_unlock (&masters_mtx); + return rv; } @@ -533,21 +573,15 @@ thread_master_free (struct thread_master *m) pthread_mutex_destroy (&m->mtx); close (m->io_pipe[0]); close (m->io_pipe[1]); + list_delete (m->cancel_req); + + hash_clean (m->cpu_record, cpu_record_hash_free); + hash_free (m->cpu_record); + m->cpu_record = NULL; XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); XFREE (MTYPE_THREAD_MASTER, m->handler.copy); XFREE (MTYPE_THREAD_MASTER, m); - - pthread_mutex_lock (&cpu_record_mtx); - { - if (cpu_record) - { - hash_clean (cpu_record, cpu_record_hash_free); - hash_free (cpu_record); - cpu_record = NULL; - } - } - pthread_mutex_unlock (&cpu_record_mtx); } /* Return remain time in second. */ @@ -619,12 +653,8 @@ thread_get (struct thread_master *m, u_char type, { tmp.func = func; tmp.funcname = funcname; - pthread_mutex_lock (&cpu_record_mtx); - { - thread->hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); - } - pthread_mutex_unlock (&cpu_record_mtx); + thread->hist = hash_get (m->cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); } thread->hist->total_active++; thread->func = func; @@ -637,7 +667,7 @@ thread_get (struct thread_master *m, u_char type, static int fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, - nfds_t count, struct timeval *timer_wait) + nfds_t count, const struct timeval *timer_wait) { /* If timer_wait is null here, that means poll() should block indefinitely, * unless the thread_master has overriden it by setting ->selectpoll_timeout. @@ -665,7 +695,7 @@ fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, num = poll (pfds, count + 1, timeout); - static unsigned char trash[64]; + unsigned char trash[64]; if (num > 0 && pfds[count].revents != 0 && num--) while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0); @@ -864,150 +894,287 @@ funcname_thread_add_event (struct thread_master *m, return thread; } +/* Thread cancellation ------------------------------------------------------ */ + +/** + * NOT's out the .events field of pollfd corresponding to the given file + * descriptor. The event to be NOT'd is passed in the 'state' parameter. + * + * This needs to happen for both copies of pollfd's. See 'thread_fetch' + * implementation for details. + * + * @param master + * @param fd + * @param state the event to cancel. One or more (OR'd together) of the + * following: + * - POLLIN + * - POLLOUT + */ static void -thread_cancel_read_or_write (struct thread *thread, short int state) +thread_cancel_rw (struct thread_master *master, int fd, short state) { - for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) - if (thread->master->handler.pfds[i].fd == thread->u.fd) - { - thread->master->handler.pfds[i].events &= ~(state); + /* Cancel POLLHUP too just in case some bozo set it */ + state |= POLLHUP; - /* remove thread fds from pfd list */ - if (thread->master->handler.pfds[i].events == 0) - { - memmove(thread->master->handler.pfds+i, - thread->master->handler.pfds+i+1, - (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd)); - thread->master->handler.pfdcount--; - return; - } - } + /* find the index of corresponding pollfd */ + nfds_t i; + + for (i = 0; i < master->handler.pfdcount; i++) + if (master->handler.pfds[i].fd == fd) + break; + + /* NOT out event. */ + master->handler.pfds[i].events &= ~(state); + + /* If all events are canceled, delete / resize the pollfd array. */ + if (master->handler.pfds[i].events == 0) + { + memmove(master->handler.pfds + i, master->handler.pfds + i + 1, + (master->handler.pfdcount - i - 1) * sizeof (struct pollfd)); + master->handler.pfdcount--; + } + + /* If we have the same pollfd in the copy, perform the same operations, + * otherwise return. */ + if (i >= master->handler.copycount) + return; + + master->handler.copy[i].events &= ~(state); + + if (master->handler.copy[i].events == 0) + { + memmove(master->handler.copy + i, master->handler.copy + i + 1, + (master->handler.copycount - i - 1) * sizeof (struct pollfd)); + master->handler.copycount--; + } } /** - * Cancel thread from scheduler. + * Process cancellation requests. * - * This function is *NOT* MT-safe. DO NOT call it from any other pthread except - * the one which owns thread->master. You will crash. + * This may only be run from the pthread which owns the thread_master. + * + * @param master the thread master to process + * @REQUIRE master->mtx */ -void -thread_cancel (struct thread *thread) +static void +do_thread_cancel (struct thread_master *master) { struct thread_list *list = NULL; struct pqueue *queue = NULL; struct thread **thread_array = NULL; + struct thread *thread; - pthread_mutex_lock (&thread->mtx); - pthread_mutex_lock (&thread->master->mtx); + struct cancel_req *cr; + struct listnode *ln; + for (ALL_LIST_ELEMENTS_RO (master->cancel_req, ln, cr)) + { + /* If this is an event object cancellation, linear search through event + * list deleting any events which have the specified argument. We also + * need to check every thread in the ready queue. */ + if (cr->eventobj) + { + struct thread *t; + thread = master->event.head; + + while (thread) + { + t = thread; + thread = t->next; + + if (t->arg == cr->eventobj) + { + thread_list_delete (&master->event, t); + if (t->ref) + *t->ref = NULL; + thread_add_unuse (master, t); + } + } - assert (pthread_self() == thread->master->owner); + thread = master->ready.head; + while (thread) + { + t = thread; + thread = t->next; + + if (t->arg == cr->eventobj) + { + thread_list_delete (&master->ready, t); + if (t->ref) + *t->ref = NULL; + thread_add_unuse (master, t); + } + } + continue; + } - switch (thread->type) - { - case THREAD_READ: - thread_cancel_read_or_write (thread, POLLIN | POLLHUP); - thread_array = thread->master->read; - break; - case THREAD_WRITE: - thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); - thread_array = thread->master->write; - break; - case THREAD_TIMER: - queue = thread->master->timer; - break; - case THREAD_EVENT: - list = &thread->master->event; - break; - case THREAD_READY: - list = &thread->master->ready; - break; - default: - goto done; - break; - } + /* The pointer varies depending on whether the cancellation request was + * made asynchronously or not. If it was, we need to check whether the + * thread even exists anymore before cancelling it. */ + thread = (cr->thread) ? cr->thread : *cr->threadref; - if (queue) - { - assert(thread->index >= 0); - pqueue_remove (thread, queue); - } - else if (list) - { - thread_list_delete (list, thread); - } - else if (thread_array) - { - thread_array[thread->u.fd] = NULL; - } - else - { - assert(!"Thread should be either in queue or list or array!"); + if (!thread) + continue; + + /* Determine the appropriate queue to cancel the thread from */ + switch (thread->type) + { + case THREAD_READ: + thread_cancel_rw (master, thread->u.fd, POLLIN); + thread_array = master->read; + break; + case THREAD_WRITE: + thread_cancel_rw (master, thread->u.fd, POLLOUT); + thread_array = master->write; + break; + case THREAD_TIMER: + queue = master->timer; + break; + case THREAD_EVENT: + list = &master->event; + break; + case THREAD_READY: + list = &master->ready; + break; + default: + continue; + break; + } + + if (queue) + { + assert(thread->index >= 0); + pqueue_remove (thread, queue); + } + else if (list) + { + thread_list_delete (list, thread); + } + else if (thread_array) + { + thread_array[thread->u.fd] = NULL; + } + else + { + assert(!"Thread should be either in queue or list or array!"); + } + + if (thread->ref) + *thread->ref = NULL; + + thread_add_unuse (thread->master, thread); } - if (thread->ref) - *thread->ref = NULL; + /* Delete and free all cancellation requests */ + list_delete_all_node (master->cancel_req); + + /* Wake up any threads which may be blocked in thread_cancel_async() */ + master->canceled = true; + pthread_cond_broadcast (&master->cancel_cond); +} - thread_add_unuse (thread->master, thread); +/** + * Cancel any events which have the specified argument. + * + * MT-Unsafe + * + * @param m the thread_master to cancel from + * @param arg the argument passed when creating the event + */ +void +thread_cancel_event (struct thread_master *master, void *arg) +{ + assert (master->owner == pthread_self()); -done: + pthread_mutex_lock (&master->mtx); + { + struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->eventobj = arg; + listnode_add (master->cancel_req, cr); + do_thread_cancel(master); + } + pthread_mutex_unlock (&master->mtx); +} + +/** + * Cancel a specific task. + * + * MT-Unsafe + * + * @param thread task to cancel + */ +void +thread_cancel (struct thread *thread) +{ + assert (thread->master->owner == pthread_self()); + + pthread_mutex_lock (&thread->master->mtx); + { + struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->thread = thread; + listnode_add (thread->master->cancel_req, cr); + do_thread_cancel (thread->master); + } pthread_mutex_unlock (&thread->master->mtx); - pthread_mutex_unlock (&thread->mtx); } -/* Delete all events which has argument value arg. */ -unsigned int -thread_cancel_event (struct thread_master *m, void *arg) +/** + * Asynchronous cancellation. + * + * Called with either a struct thread ** or void * to an event argument, + * this function posts the correct cancellation request and blocks until it is + * serviced. + * + * If the thread is currently running, execution blocks until it completes. + * + * The last two parameters are mutually exclusive, i.e. if you pass one the + * other must be NULL. + * + * When the cancellation procedure executes on the target thread_master, the + * thread * provided is checked for nullity. If it is null, the thread is + * assumed to no longer exist and the cancellation request is a no-op. Thus + * users of this API must pass a back-reference when scheduling the original + * task. + * + * MT-Safe + * + * @param master the thread master with the relevant event / task + * @param thread pointer to thread to cancel + * @param eventobj the event + */ +void +thread_cancel_async (struct thread_master *master, struct thread **thread, + void *eventobj) { - unsigned int ret = 0; - struct thread *thread; - struct thread *t; + assert (!(thread && eventobj) && (thread || eventobj)); + assert (master->owner != pthread_self()); - pthread_mutex_lock (&m->mtx); + pthread_mutex_lock (&master->mtx); { - thread = m->event.head; - while (thread) - { - t = thread; - pthread_mutex_lock (&t->mtx); - { - thread = t->next; + master->canceled = false; - if (t->arg == arg) - { - ret++; - thread_list_delete (&m->event, t); - if (t->ref) - *t->ref = NULL; - thread_add_unuse (m, t); - } - } - pthread_mutex_unlock (&t->mtx); + if (thread) + { + struct cancel_req *cr = + XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->threadref = thread; + listnode_add (master->cancel_req, cr); } - - /* thread can be on the ready list too */ - thread = m->ready.head; - while (thread) + else if (eventobj) { - t = thread; - pthread_mutex_lock (&t->mtx); - { - thread = t->next; - - if (t->arg == arg) - { - ret++; - thread_list_delete (&m->ready, t); - if (t->ref) - *t->ref = NULL; - thread_add_unuse (m, t); - } - } - pthread_mutex_unlock (&t->mtx); + struct cancel_req *cr = + XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->eventobj = eventobj; + listnode_add (master->cancel_req, cr); } + AWAKEN (master); + + while (!master->canceled) + pthread_cond_wait (&master->cancel_cond, &master->mtx); } - pthread_mutex_unlock (&m->mtx); - return ret; + pthread_mutex_unlock (&master->mtx); } +/* ------------------------------------------------------------------------- */ static struct timeval * thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) @@ -1053,13 +1220,22 @@ thread_process_io_helper (struct thread_master *m, struct thread *thread, return 1; } +/** + * Process I/O events. + * + * Walks through file descriptor array looking for those pollfds whose .revents + * field has something interesting. Deletes any invalid file descriptors. + * + * @param m the thread master + * @param num the number of active file descriptors (return value of poll()) + */ static void -thread_process_io (struct thread_master *m, struct pollfd *pfds, - unsigned int num, unsigned int count) +thread_process_io (struct thread_master *m, unsigned int num) { unsigned int ready = 0; + struct pollfd *pfds = m->handler.copy; - for (nfds_t i = 0; i < count && ready < num ; ++i) + for (nfds_t i = 0; i < m->handler.copycount && ready < num ; ++i) { /* no event for current fd? immediately continue */ if (pfds[i].revents == 0) @@ -1088,8 +1264,9 @@ thread_process_io (struct thread_master *m, struct pollfd *pfds, m->handler.pfdcount--; memmove (pfds + i, pfds + i + 1, - (count - i - 1) * sizeof(struct pollfd)); - count--; + (m->handler.copycount - i - 1) * sizeof(struct pollfd)); + m->handler.copycount--; + i--; } } @@ -1139,98 +1316,115 @@ thread_process (struct thread_list *list) struct thread * thread_fetch (struct thread_master *m, struct thread *fetch) { - struct thread *thread; + struct thread *thread = NULL; struct timeval now; - struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; - struct timeval *timer_wait = &timer_val; + struct timeval zerotime = { 0, 0 }; + struct timeval tv; + struct timeval *tw = NULL; + + int num = 0; + + do { + /* Handle signals if any */ + if (m->handle_signals) + quagga_sigevent_process (); + + pthread_mutex_lock (&m->mtx); + + /* Process any pending cancellation requests */ + do_thread_cancel (m); + + /* Post events to ready queue. This must come before the following block + * since events should occur immediately */ + thread_process (&m->event); + + /* If there are no tasks on the ready queue, we will poll() until a timer + * expires or we receive I/O, whichever comes first. The strategy for doing + * this is: + * + * - If there are events pending, set the poll() timeout to zero + * - If there are no events pending, but there are timers pending, set the + * timeout to the smallest remaining time on any timer + * - If there are neither timers nor events pending, but there are file + * descriptors pending, block indefinitely in poll() + * - If nothing is pending, it's time for the application to die + * + * In every case except the last, we need to hit poll() at least once per + * loop to avoid starvation by events */ + + if (m->ready.count == 0) + tw = thread_timer_wait (m->timer, &tv); + + if (m->ready.count != 0 || (tw && !timercmp (tw, &zerotime, >))) + tw = &zerotime; + + if (!tw && m->handler.pfdcount == 0) + { /* die */ + pthread_mutex_unlock (&m->mtx); + fetch = NULL; + break; + } - do + /* Copy pollfd array + # active pollfds in it. Not necessary to copy + * the array size as this is fixed. */ + m->handler.copycount = m->handler.pfdcount; + memcpy (m->handler.copy, m->handler.pfds, + m->handler.copycount * sizeof (struct pollfd)); + + pthread_mutex_unlock (&m->mtx); { - int num = 0; + num = fd_poll (m, m->handler.copy, m->handler.pfdsize, + m->handler.copycount, tw); + } + pthread_mutex_lock (&m->mtx); - /* Signals pre-empt everything */ - if (m->handle_signals) - quagga_sigevent_process (); - - pthread_mutex_lock (&m->mtx); - /* Drain the ready queue of already scheduled jobs, before scheduling - * more. - */ - if ((thread = thread_trim_head (&m->ready)) != NULL) - { - fetch = thread_run (m, thread, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock (&m->mtx); - return fetch; - } - - /* To be fair to all kinds of threads, and avoid starvation, we - * need to be careful to consider all thread types for scheduling - * in each quanta. I.e. we should not return early from here on. - */ - - /* Normal event are the next highest priority. */ - thread_process (&m->event); - - /* Calculate select wait timer if nothing else to do */ - if (m->ready.count == 0) - { - timer_wait = thread_timer_wait (m->timer, &timer_val); - } + /* Handle any errors received in poll() */ + if (num < 0) + { + if (errno == EINTR) + { + pthread_mutex_unlock (&m->mtx); + continue; /* loop around to signal handler */ + } - if (timer_wait && timer_wait->tv_sec < 0) - { - timerclear(&timer_val); - timer_wait = &timer_val; - } + /* else die */ + zlog_warn ("poll() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); + fetch = NULL; + break; + } - unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; - memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); + /* Since we could have received more cancellation requests during poll(), process those */ + do_thread_cancel (m); - pthread_mutex_unlock (&m->mtx); + /* Post timers to ready queue. */ + monotime(&now); + thread_process_timers (m->timer, &now); + + /* Post I/O to ready queue. */ + if (num > 0) + thread_process_io (m, num); + + /* If we have a ready task, break the loop and return it to the caller */ + if ((thread = thread_trim_head (&m->ready))) { - num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); + fetch = thread_run (m, thread, fetch); + if (fetch->ref) + *fetch->ref = NULL; } - pthread_mutex_lock (&m->mtx); - /* Signals should get quick treatment */ - if (num < 0) - { - if (errno == EINTR) - { - pthread_mutex_unlock (&m->mtx); - continue; /* signal received - process it */ - } - zlog_warn ("poll() error: %s", safe_strerror (errno)); - pthread_mutex_unlock (&m->mtx); - return NULL; - } + pthread_mutex_unlock (&m->mtx); - /* Check foreground timers. Historically, they have had higher - * priority than I/O threads, so let's push them onto the ready - * list in front of the I/O threads. */ - monotime(&now); - thread_process_timers (m->timer, &now); - - /* Got IO, process it */ - if (num > 0) - thread_process_io (m, m->handler.copy, num, count); - - if ((thread = thread_trim_head (&m->ready)) != NULL) - { - fetch = thread_run (m, thread, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock (&m->mtx); - return fetch; - } + } while (!thread && m->spin); - pthread_mutex_unlock (&m->mtx); - - } while (m->spin); + return fetch; +} - return NULL; +static unsigned long +timeval_elapsed (struct timeval a, struct timeval b) +{ + return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO) + + (a.tv_usec - b.tv_usec)); } unsigned long @@ -1281,8 +1475,6 @@ thread_getrusage (RUSAGE_T *r) getrusage(RUSAGE_SELF, &(r->cpu)); } -struct thread *thread_current = NULL; - /* We check thread consumed time. If the system has getrusage, we'll use that to get in-depth stats on the performance of the thread in addition to wall clock time stats from gettimeofday. */ @@ -1295,9 +1487,9 @@ thread_call (struct thread *thread) GETRUSAGE (&before); thread->real = before.real; - thread_current = thread; + pthread_setspecific (thread_current, thread); (*thread->func) (thread); - thread_current = NULL; + pthread_setspecific (thread_current, NULL); GETRUSAGE (&after); @@ -1350,12 +1542,8 @@ funcname_thread_execute (struct thread_master *m, tmp.func = dummy.func = func; tmp.funcname = dummy.funcname = funcname; - pthread_mutex_lock (&cpu_record_mtx); - { - dummy.hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); - } - pthread_mutex_unlock (&cpu_record_mtx); + dummy.hist = hash_get (m->cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); dummy.schedfrom = schedfrom; dummy.schedfrom_line = fromln; |
