summaryrefslogtreecommitdiff
path: root/lib/thread.c
diff options
context:
space:
mode:
authorDaniel Walton <dwalton@cumulusnetworks.com>2017-06-30 17:52:56 +0000
committerDaniel Walton <dwalton@cumulusnetworks.com>2017-06-30 17:52:56 +0000
commit1161690b93b48fbd07f4ee25c1261574db8d71c5 (patch)
tree7ffbe5c3b333b1fe0b8a3f042d8b1af602d48019 /lib/thread.c
parentab782c96f881b1fdd59f52ba972cd82b5eeadc66 (diff)
parent5fca4e3635c2778e8349bce0eaf944c26913d321 (diff)
Merge branch 'master' of https://github.com/dwalton76/frr into bgpd-ipv4-plus-label-misc3
Conflicts: bgpd/bgp_route.c
Diffstat (limited to 'lib/thread.c')
-rw-r--r--lib/thread.c934
1 files changed, 561 insertions, 373 deletions
diff --git a/lib/thread.c b/lib/thread.c
index bf3500fd8b..801168a799 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -47,16 +47,15 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
write (m->io_pipe[1], &wakebyte, 1); \
} while (0);
-static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
-static struct hash *cpu_record = NULL;
+/* control variable for initializer */
+pthread_once_t init_once = PTHREAD_ONCE_INIT;
+pthread_key_t thread_current;
+
+pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
+static struct list *masters;
-static unsigned long
-timeval_elapsed (struct timeval a, struct timeval b)
-{
- return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
- + (a.tv_usec - b.tv_usec));
-}
+/* CLI start ---------------------------------------------------------------- */
static unsigned int
cpu_record_hash_key (struct cpu_thread_history *a)
{
@@ -96,22 +95,22 @@ vty_out_cpu_thread_history(struct vty* vty,
a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls,
a->cpu.total/a->total_calls, a->cpu.max,
a->real.total/a->total_calls, a->real.max);
- vty_out(vty, " %c%c%c%c%c %s%s",
+ vty_outln (vty, " %c%c%c%c%c %s",
a->types & (1 << THREAD_READ) ? 'R':' ',
a->types & (1 << THREAD_WRITE) ? 'W':' ',
a->types & (1 << THREAD_TIMER) ? 'T':' ',
a->types & (1 << THREAD_EVENT) ? 'E':' ',
a->types & (1 << THREAD_EXECUTE) ? 'X':' ',
- a->funcname, VTY_NEWLINE);
+ a->funcname);
}
static void
-cpu_record_hash_print(struct hash_backet *bucket,
- void *args[])
+cpu_record_hash_print(struct hash_backet *bucket, void *args[])
{
struct cpu_thread_history *totals = args[0];
struct vty *vty = args[1];
thread_type *filter = args[2];
+
struct cpu_thread_history *a = bucket->data;
if ( !(a->types & *filter) )
@@ -132,169 +131,174 @@ cpu_record_print(struct vty *vty, thread_type filter)
{
struct cpu_thread_history tmp;
void *args[3] = {&tmp, vty, &filter};
+ struct thread_master *m;
+ struct listnode *ln;
memset(&tmp, 0, sizeof tmp);
tmp.funcname = "TOTAL";
tmp.types = filter;
- vty_out(vty, "%21s %18s %18s%s",
- "", "CPU (user+system):", "Real (wall-clock):", VTY_NEWLINE);
- vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
- vty_out(vty, " Avg uSec Max uSecs");
- vty_out(vty, " Type Thread%s", VTY_NEWLINE);
-
- pthread_mutex_lock (&cpu_record_mtx);
+ pthread_mutex_lock (&masters_mtx);
{
- hash_iterate(cpu_record,
- (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
- args);
+ for (ALL_LIST_ELEMENTS_RO (masters, ln, m)) {
+ const char *name = m->name ? m->name : "main";
+
+ char underline[strlen(name) + 1];
+ memset (underline, '-', sizeof (underline));
+ underline[sizeof(underline)] = '\0';
+
+ vty_out (vty, VTYNL);
+ vty_outln(vty, "Showing statistics for pthread %s", name);
+ vty_outln(vty, "-------------------------------%s", underline);
+ vty_outln(vty, "%21s %18s %18s", "", "CPU (user+system):", "Real (wall-clock):");
+ vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
+ vty_out(vty, " Avg uSec Max uSecs");
+ vty_outln(vty, " Type Thread");
+
+ if (m->cpu_record->count)
+ hash_iterate(m->cpu_record,
+ (void (*)(struct hash_backet *, void *))
+ cpu_record_hash_print,
+ args);
+ else
+ vty_outln(vty, "No data to display yet.");
+
+ vty_out(vty, VTYNL);
+ }
}
- pthread_mutex_unlock (&cpu_record_mtx);
+ pthread_mutex_unlock (&masters_mtx);
+
+ vty_out(vty, VTYNL);
+ vty_outln(vty, "Total thread statistics");
+ vty_outln(vty, "-------------------------");
+ vty_outln(vty, "%21s %18s %18s", "", "CPU (user+system):", "Real (wall-clock):");
+ vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
+ vty_out(vty, " Avg uSec Max uSecs");
+ vty_outln(vty, " Type Thread");
if (tmp.total_calls > 0)
vty_out_cpu_thread_history(vty, &tmp);
}
-DEFUN (show_thread_cpu,
- show_thread_cpu_cmd,
- "show thread cpu [FILTER]",
- SHOW_STR
- "Thread information\n"
- "Thread CPU usage\n"
- "Display filter (rwtexb)\n")
-{
- int idx_filter = 3;
- int i = 0;
- thread_type filter = (thread_type) -1U;
-
- if (argc > 3)
- {
- filter = 0;
- while (argv[idx_filter]->arg[i] != '\0')
- {
- switch ( argv[idx_filter]->arg[i] )
- {
- case 'r':
- case 'R':
- filter |= (1 << THREAD_READ);
- break;
- case 'w':
- case 'W':
- filter |= (1 << THREAD_WRITE);
- break;
- case 't':
- case 'T':
- filter |= (1 << THREAD_TIMER);
- break;
- case 'e':
- case 'E':
- filter |= (1 << THREAD_EVENT);
- break;
- case 'x':
- case 'X':
- filter |= (1 << THREAD_EXECUTE);
- break;
- default:
- break;
- }
- ++i;
- }
- if (filter == 0)
- {
- vty_out(vty, "Invalid filter \"%s\" specified,"
- " must contain at least one of 'RWTEXB'%s",
- argv[idx_filter]->arg, VTY_NEWLINE);
- return CMD_WARNING;
- }
- }
-
- cpu_record_print(vty, filter);
- return CMD_SUCCESS;
-}
-
static void
-cpu_record_hash_clear (struct hash_backet *bucket,
- void *args)
+cpu_record_hash_clear (struct hash_backet *bucket, void *args[])
{
- thread_type *filter = args;
+ thread_type *filter = args[0];
+ struct hash *cpu_record = args[1];
+
struct cpu_thread_history *a = bucket->data;
if ( !(a->types & *filter) )
return;
- pthread_mutex_lock (&cpu_record_mtx);
- {
- hash_release (cpu_record, bucket->data);
- }
- pthread_mutex_unlock (&cpu_record_mtx);
+ hash_release (cpu_record, bucket->data);
}
static void
cpu_record_clear (thread_type filter)
{
thread_type *tmp = &filter;
+ struct thread_master *m;
+ struct listnode *ln;
- pthread_mutex_lock (&cpu_record_mtx);
+ pthread_mutex_lock (&masters_mtx);
{
- hash_iterate (cpu_record,
- (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
- tmp);
+ for (ALL_LIST_ELEMENTS_RO (masters, ln, m)) {
+ pthread_mutex_lock (&m->mtx);
+ {
+ void *args[2] = { tmp, m->cpu_record };
+ hash_iterate (m->cpu_record,
+ (void (*) (struct hash_backet*,void*))
+ cpu_record_hash_clear,
+ args);
+ }
+ pthread_mutex_unlock (&m->mtx);
+ }
}
- pthread_mutex_unlock (&cpu_record_mtx);
+ pthread_mutex_unlock (&masters_mtx);
+}
+
+static thread_type
+parse_filter (const char *filterstr)
+{
+ int i = 0;
+ int filter = 0;
+
+ while (filterstr[i] != '\0')
+ {
+ switch (filterstr[i])
+ {
+ case 'r':
+ case 'R':
+ filter |= (1 << THREAD_READ);
+ break;
+ case 'w':
+ case 'W':
+ filter |= (1 << THREAD_WRITE);
+ break;
+ case 't':
+ case 'T':
+ filter |= (1 << THREAD_TIMER);
+ break;
+ case 'e':
+ case 'E':
+ filter |= (1 << THREAD_EVENT);
+ break;
+ case 'x':
+ case 'X':
+ filter |= (1 << THREAD_EXECUTE);
+ break;
+ default:
+ break;
+ }
+ ++i;
+ }
+ return filter;
+}
+
+DEFUN (show_thread_cpu,
+ show_thread_cpu_cmd,
+ "show thread cpu [FILTER]",
+ SHOW_STR
+ "Thread information\n"
+ "Thread CPU usage\n"
+ "Display filter (rwtexb)\n")
+{
+ thread_type filter = (thread_type) -1U;
+ int idx = 0;
+
+ if (argv_find (argv, argc, "FILTER", &idx)) {
+ filter = parse_filter (argv[idx]->arg);
+ if (!filter) {
+ vty_outln(vty, "Invalid filter \"%s\" specified; must contain at least"
+ "one of 'RWTEXB'%s", argv[idx]->arg);
+ return CMD_WARNING;
+ }
+ }
+
+ cpu_record_print(vty, filter);
+ return CMD_SUCCESS;
}
DEFUN (clear_thread_cpu,
clear_thread_cpu_cmd,
"clear thread cpu [FILTER]",
- "Clear stored data\n"
+ "Clear stored data in all pthreads\n"
"Thread information\n"
"Thread CPU usage\n"
"Display filter (rwtexb)\n")
{
- int idx_filter = 3;
- int i = 0;
thread_type filter = (thread_type) -1U;
-
- if (argc > 3)
- {
- filter = 0;
- while (argv[idx_filter]->arg[i] != '\0')
- {
- switch ( argv[idx_filter]->arg[i] )
- {
- case 'r':
- case 'R':
- filter |= (1 << THREAD_READ);
- break;
- case 'w':
- case 'W':
- filter |= (1 << THREAD_WRITE);
- break;
- case 't':
- case 'T':
- filter |= (1 << THREAD_TIMER);
- break;
- case 'e':
- case 'E':
- filter |= (1 << THREAD_EVENT);
- break;
- case 'x':
- case 'X':
- filter |= (1 << THREAD_EXECUTE);
- break;
- default:
- break;
- }
- ++i;
- }
- if (filter == 0)
- {
- vty_out(vty, "Invalid filter \"%s\" specified,"
- " must contain at least one of 'RWTEXB'%s",
- argv[idx_filter]->arg, VTY_NEWLINE);
- return CMD_WARNING;
- }
+ int idx = 0;
+
+ if (argv_find (argv, argc, "FILTER", &idx)) {
+ filter = parse_filter (argv[idx]->arg);
+ if (!filter) {
+ vty_outln(vty, "Invalid filter \"%s\" specified; must contain at least"
+ "one of 'RWTEXB'%s", argv[idx]->arg);
+ return CMD_WARNING;
}
+ }
cpu_record_clear (filter);
return CMD_SUCCESS;
@@ -306,6 +310,8 @@ thread_cmd_init (void)
install_element (VIEW_NODE, &show_thread_cpu_cmd);
install_element (ENABLE_NODE, &clear_thread_cpu_cmd);
}
+/* CLI end ------------------------------------------------------------------ */
+
static int
thread_timer_cmp(void *a, void *b)
@@ -328,30 +334,43 @@ thread_timer_update(void *node, int actual_position)
thread->index = actual_position;
}
+static void
+cancelreq_del (void *cr)
+{
+ XFREE (MTYPE_TMP, cr);
+}
+
+/* initializer, only ever called once */
+static void initializer ()
+{
+ if (!masters)
+ masters = list_new();
+
+ pthread_key_create (&thread_current, NULL);
+}
+
/* Allocate new thread master. */
struct thread_master *
-thread_master_create (void)
+thread_master_create (const char *name)
{
struct thread_master *rv;
struct rlimit limit;
- getrlimit(RLIMIT_NOFILE, &limit);
-
- pthread_mutex_lock (&cpu_record_mtx);
- {
- if (cpu_record == NULL)
- cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
- (int (*) (const void *, const void *))
- cpu_record_hash_cmp);
- }
- pthread_mutex_unlock (&cpu_record_mtx);
+ pthread_once (&init_once, &initializer);
rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
if (rv == NULL)
return NULL;
+ /* Initialize master mutex */
pthread_mutex_init (&rv->mtx, NULL);
+ pthread_cond_init (&rv->cancel_cond, NULL);
+
+ /* Set name */
+ rv->name = name ? XSTRDUP (MTYPE_THREAD_MASTER, name) : NULL;
+ /* Initialize I/O task data structures */
+ getrlimit(RLIMIT_NOFILE, &limit);
rv->fd_limit = (int)limit.rlim_cur;
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
if (rv->read == NULL)
@@ -359,7 +378,6 @@ thread_master_create (void)
XFREE (MTYPE_THREAD_MASTER, rv);
return NULL;
}
-
rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
if (rv->write == NULL)
{
@@ -368,17 +386,32 @@ thread_master_create (void)
return NULL;
}
+ rv->cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
+ (int (*) (const void *, const void *))
+ cpu_record_hash_cmp);
+
+
/* Initialize the timer queues */
rv->timer = pqueue_create();
rv->timer->cmp = thread_timer_cmp;
rv->timer->update = thread_timer_update;
+
+ /* Initialize thread_fetch() settings */
rv->spin = true;
rv->handle_signals = true;
+
+ /* Set pthread owner, should be updated by actual owner */
rv->owner = pthread_self();
+ rv->cancel_req = list_new ();
+ rv->cancel_req->del = cancelreq_del;
+ rv->canceled = true;
+
+ /* Initialize pipe poker */
pipe (rv->io_pipe);
set_nonblocking (rv->io_pipe[0]);
set_nonblocking (rv->io_pipe[1]);
+ /* Initialize data structures for poll() */
rv->handler.pfdsize = rv->fd_limit;
rv->handler.pfdcount = 0;
rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
@@ -386,6 +419,13 @@ thread_master_create (void)
rv->handler.copy = XCALLOC (MTYPE_THREAD_MASTER,
sizeof (struct pollfd) * rv->handler.pfdsize);
+ /* add to list */
+ pthread_mutex_lock (&masters_mtx);
+ {
+ listnode_add (masters, rv);
+ }
+ pthread_mutex_unlock (&masters_mtx);
+
return rv;
}
@@ -533,21 +573,15 @@ thread_master_free (struct thread_master *m)
pthread_mutex_destroy (&m->mtx);
close (m->io_pipe[0]);
close (m->io_pipe[1]);
+ list_delete (m->cancel_req);
+
+ hash_clean (m->cpu_record, cpu_record_hash_free);
+ hash_free (m->cpu_record);
+ m->cpu_record = NULL;
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
XFREE (MTYPE_THREAD_MASTER, m->handler.copy);
XFREE (MTYPE_THREAD_MASTER, m);
-
- pthread_mutex_lock (&cpu_record_mtx);
- {
- if (cpu_record)
- {
- hash_clean (cpu_record, cpu_record_hash_free);
- hash_free (cpu_record);
- cpu_record = NULL;
- }
- }
- pthread_mutex_unlock (&cpu_record_mtx);
}
/* Return remain time in second. */
@@ -619,12 +653,8 @@ thread_get (struct thread_master *m, u_char type,
{
tmp.func = func;
tmp.funcname = funcname;
- pthread_mutex_lock (&cpu_record_mtx);
- {
- thread->hist = hash_get (cpu_record, &tmp,
- (void * (*) (void *))cpu_record_hash_alloc);
- }
- pthread_mutex_unlock (&cpu_record_mtx);
+ thread->hist = hash_get (m->cpu_record, &tmp,
+ (void * (*) (void *))cpu_record_hash_alloc);
}
thread->hist->total_active++;
thread->func = func;
@@ -637,7 +667,7 @@ thread_get (struct thread_master *m, u_char type,
static int
fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
- nfds_t count, struct timeval *timer_wait)
+ nfds_t count, const struct timeval *timer_wait)
{
/* If timer_wait is null here, that means poll() should block indefinitely,
* unless the thread_master has overriden it by setting ->selectpoll_timeout.
@@ -665,7 +695,7 @@ fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
num = poll (pfds, count + 1, timeout);
- static unsigned char trash[64];
+ unsigned char trash[64];
if (num > 0 && pfds[count].revents != 0 && num--)
while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0);
@@ -864,150 +894,287 @@ funcname_thread_add_event (struct thread_master *m,
return thread;
}
+/* Thread cancellation ------------------------------------------------------ */
+
+/**
+ * NOT's out the .events field of pollfd corresponding to the given file
+ * descriptor. The event to be NOT'd is passed in the 'state' parameter.
+ *
+ * This needs to happen for both copies of pollfd's. See 'thread_fetch'
+ * implementation for details.
+ *
+ * @param master
+ * @param fd
+ * @param state the event to cancel. One or more (OR'd together) of the
+ * following:
+ * - POLLIN
+ * - POLLOUT
+ */
static void
-thread_cancel_read_or_write (struct thread *thread, short int state)
+thread_cancel_rw (struct thread_master *master, int fd, short state)
{
- for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i)
- if (thread->master->handler.pfds[i].fd == thread->u.fd)
- {
- thread->master->handler.pfds[i].events &= ~(state);
+ /* Cancel POLLHUP too just in case some bozo set it */
+ state |= POLLHUP;
- /* remove thread fds from pfd list */
- if (thread->master->handler.pfds[i].events == 0)
- {
- memmove(thread->master->handler.pfds+i,
- thread->master->handler.pfds+i+1,
- (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
- thread->master->handler.pfdcount--;
- return;
- }
- }
+ /* find the index of corresponding pollfd */
+ nfds_t i;
+
+ for (i = 0; i < master->handler.pfdcount; i++)
+ if (master->handler.pfds[i].fd == fd)
+ break;
+
+ /* NOT out event. */
+ master->handler.pfds[i].events &= ~(state);
+
+ /* If all events are canceled, delete / resize the pollfd array. */
+ if (master->handler.pfds[i].events == 0)
+ {
+ memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
+ (master->handler.pfdcount - i - 1) * sizeof (struct pollfd));
+ master->handler.pfdcount--;
+ }
+
+ /* If we have the same pollfd in the copy, perform the same operations,
+ * otherwise return. */
+ if (i >= master->handler.copycount)
+ return;
+
+ master->handler.copy[i].events &= ~(state);
+
+ if (master->handler.copy[i].events == 0)
+ {
+ memmove(master->handler.copy + i, master->handler.copy + i + 1,
+ (master->handler.copycount - i - 1) * sizeof (struct pollfd));
+ master->handler.copycount--;
+ }
}
/**
- * Cancel thread from scheduler.
+ * Process cancellation requests.
*
- * This function is *NOT* MT-safe. DO NOT call it from any other pthread except
- * the one which owns thread->master. You will crash.
+ * This may only be run from the pthread which owns the thread_master.
+ *
+ * @param master the thread master to process
+ * @REQUIRE master->mtx
*/
-void
-thread_cancel (struct thread *thread)
+static void
+do_thread_cancel (struct thread_master *master)
{
struct thread_list *list = NULL;
struct pqueue *queue = NULL;
struct thread **thread_array = NULL;
+ struct thread *thread;
- pthread_mutex_lock (&thread->mtx);
- pthread_mutex_lock (&thread->master->mtx);
+ struct cancel_req *cr;
+ struct listnode *ln;
+ for (ALL_LIST_ELEMENTS_RO (master->cancel_req, ln, cr))
+ {
+ /* If this is an event object cancellation, linear search through event
+ * list deleting any events which have the specified argument. We also
+ * need to check every thread in the ready queue. */
+ if (cr->eventobj)
+ {
+ struct thread *t;
+ thread = master->event.head;
+
+ while (thread)
+ {
+ t = thread;
+ thread = t->next;
+
+ if (t->arg == cr->eventobj)
+ {
+ thread_list_delete (&master->event, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse (master, t);
+ }
+ }
- assert (pthread_self() == thread->master->owner);
+ thread = master->ready.head;
+ while (thread)
+ {
+ t = thread;
+ thread = t->next;
+
+ if (t->arg == cr->eventobj)
+ {
+ thread_list_delete (&master->ready, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse (master, t);
+ }
+ }
+ continue;
+ }
- switch (thread->type)
- {
- case THREAD_READ:
- thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
- thread_array = thread->master->read;
- break;
- case THREAD_WRITE:
- thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
- thread_array = thread->master->write;
- break;
- case THREAD_TIMER:
- queue = thread->master->timer;
- break;
- case THREAD_EVENT:
- list = &thread->master->event;
- break;
- case THREAD_READY:
- list = &thread->master->ready;
- break;
- default:
- goto done;
- break;
- }
+ /* The pointer varies depending on whether the cancellation request was
+ * made asynchronously or not. If it was, we need to check whether the
+ * thread even exists anymore before cancelling it. */
+ thread = (cr->thread) ? cr->thread : *cr->threadref;
- if (queue)
- {
- assert(thread->index >= 0);
- pqueue_remove (thread, queue);
- }
- else if (list)
- {
- thread_list_delete (list, thread);
- }
- else if (thread_array)
- {
- thread_array[thread->u.fd] = NULL;
- }
- else
- {
- assert(!"Thread should be either in queue or list or array!");
+ if (!thread)
+ continue;
+
+ /* Determine the appropriate queue to cancel the thread from */
+ switch (thread->type)
+ {
+ case THREAD_READ:
+ thread_cancel_rw (master, thread->u.fd, POLLIN);
+ thread_array = master->read;
+ break;
+ case THREAD_WRITE:
+ thread_cancel_rw (master, thread->u.fd, POLLOUT);
+ thread_array = master->write;
+ break;
+ case THREAD_TIMER:
+ queue = master->timer;
+ break;
+ case THREAD_EVENT:
+ list = &master->event;
+ break;
+ case THREAD_READY:
+ list = &master->ready;
+ break;
+ default:
+ continue;
+ break;
+ }
+
+ if (queue)
+ {
+ assert(thread->index >= 0);
+ pqueue_remove (thread, queue);
+ }
+ else if (list)
+ {
+ thread_list_delete (list, thread);
+ }
+ else if (thread_array)
+ {
+ thread_array[thread->u.fd] = NULL;
+ }
+ else
+ {
+ assert(!"Thread should be either in queue or list or array!");
+ }
+
+ if (thread->ref)
+ *thread->ref = NULL;
+
+ thread_add_unuse (thread->master, thread);
}
- if (thread->ref)
- *thread->ref = NULL;
+ /* Delete and free all cancellation requests */
+ list_delete_all_node (master->cancel_req);
+
+ /* Wake up any threads which may be blocked in thread_cancel_async() */
+ master->canceled = true;
+ pthread_cond_broadcast (&master->cancel_cond);
+}
- thread_add_unuse (thread->master, thread);
+/**
+ * Cancel any events which have the specified argument.
+ *
+ * MT-Unsafe
+ *
+ * @param m the thread_master to cancel from
+ * @param arg the argument passed when creating the event
+ */
+void
+thread_cancel_event (struct thread_master *master, void *arg)
+{
+ assert (master->owner == pthread_self());
-done:
+ pthread_mutex_lock (&master->mtx);
+ {
+ struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
+ cr->eventobj = arg;
+ listnode_add (master->cancel_req, cr);
+ do_thread_cancel(master);
+ }
+ pthread_mutex_unlock (&master->mtx);
+}
+
+/**
+ * Cancel a specific task.
+ *
+ * MT-Unsafe
+ *
+ * @param thread task to cancel
+ */
+void
+thread_cancel (struct thread *thread)
+{
+ assert (thread->master->owner == pthread_self());
+
+ pthread_mutex_lock (&thread->master->mtx);
+ {
+ struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
+ cr->thread = thread;
+ listnode_add (thread->master->cancel_req, cr);
+ do_thread_cancel (thread->master);
+ }
pthread_mutex_unlock (&thread->master->mtx);
- pthread_mutex_unlock (&thread->mtx);
}
-/* Delete all events which has argument value arg. */
-unsigned int
-thread_cancel_event (struct thread_master *m, void *arg)
+/**
+ * Asynchronous cancellation.
+ *
+ * Called with either a struct thread ** or void * to an event argument,
+ * this function posts the correct cancellation request and blocks until it is
+ * serviced.
+ *
+ * If the thread is currently running, execution blocks until it completes.
+ *
+ * The last two parameters are mutually exclusive, i.e. if you pass one the
+ * other must be NULL.
+ *
+ * When the cancellation procedure executes on the target thread_master, the
+ * thread * provided is checked for nullity. If it is null, the thread is
+ * assumed to no longer exist and the cancellation request is a no-op. Thus
+ * users of this API must pass a back-reference when scheduling the original
+ * task.
+ *
+ * MT-Safe
+ *
+ * @param master the thread master with the relevant event / task
+ * @param thread pointer to thread to cancel
+ * @param eventobj the event
+ */
+void
+thread_cancel_async (struct thread_master *master, struct thread **thread,
+ void *eventobj)
{
- unsigned int ret = 0;
- struct thread *thread;
- struct thread *t;
+ assert (!(thread && eventobj) && (thread || eventobj));
+ assert (master->owner != pthread_self());
- pthread_mutex_lock (&m->mtx);
+ pthread_mutex_lock (&master->mtx);
{
- thread = m->event.head;
- while (thread)
- {
- t = thread;
- pthread_mutex_lock (&t->mtx);
- {
- thread = t->next;
+ master->canceled = false;
- if (t->arg == arg)
- {
- ret++;
- thread_list_delete (&m->event, t);
- if (t->ref)
- *t->ref = NULL;
- thread_add_unuse (m, t);
- }
- }
- pthread_mutex_unlock (&t->mtx);
+ if (thread)
+ {
+ struct cancel_req *cr =
+ XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
+ cr->threadref = thread;
+ listnode_add (master->cancel_req, cr);
}
-
- /* thread can be on the ready list too */
- thread = m->ready.head;
- while (thread)
+ else if (eventobj)
{
- t = thread;
- pthread_mutex_lock (&t->mtx);
- {
- thread = t->next;
-
- if (t->arg == arg)
- {
- ret++;
- thread_list_delete (&m->ready, t);
- if (t->ref)
- *t->ref = NULL;
- thread_add_unuse (m, t);
- }
- }
- pthread_mutex_unlock (&t->mtx);
+ struct cancel_req *cr =
+ XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
+ cr->eventobj = eventobj;
+ listnode_add (master->cancel_req, cr);
}
+ AWAKEN (master);
+
+ while (!master->canceled)
+ pthread_cond_wait (&master->cancel_cond, &master->mtx);
}
- pthread_mutex_unlock (&m->mtx);
- return ret;
+ pthread_mutex_unlock (&master->mtx);
}
+/* ------------------------------------------------------------------------- */
static struct timeval *
thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
@@ -1053,13 +1220,22 @@ thread_process_io_helper (struct thread_master *m, struct thread *thread,
return 1;
}
+/**
+ * Process I/O events.
+ *
+ * Walks through file descriptor array looking for those pollfds whose .revents
+ * field has something interesting. Deletes any invalid file descriptors.
+ *
+ * @param m the thread master
+ * @param num the number of active file descriptors (return value of poll())
+ */
static void
-thread_process_io (struct thread_master *m, struct pollfd *pfds,
- unsigned int num, unsigned int count)
+thread_process_io (struct thread_master *m, unsigned int num)
{
unsigned int ready = 0;
+ struct pollfd *pfds = m->handler.copy;
- for (nfds_t i = 0; i < count && ready < num ; ++i)
+ for (nfds_t i = 0; i < m->handler.copycount && ready < num ; ++i)
{
/* no event for current fd? immediately continue */
if (pfds[i].revents == 0)
@@ -1088,8 +1264,9 @@ thread_process_io (struct thread_master *m, struct pollfd *pfds,
m->handler.pfdcount--;
memmove (pfds + i, pfds + i + 1,
- (count - i - 1) * sizeof(struct pollfd));
- count--;
+ (m->handler.copycount - i - 1) * sizeof(struct pollfd));
+ m->handler.copycount--;
+
i--;
}
}
@@ -1139,98 +1316,115 @@ thread_process (struct thread_list *list)
struct thread *
thread_fetch (struct thread_master *m, struct thread *fetch)
{
- struct thread *thread;
+ struct thread *thread = NULL;
struct timeval now;
- struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
- struct timeval *timer_wait = &timer_val;
+ struct timeval zerotime = { 0, 0 };
+ struct timeval tv;
+ struct timeval *tw = NULL;
+
+ int num = 0;
+
+ do {
+ /* Handle signals if any */
+ if (m->handle_signals)
+ quagga_sigevent_process ();
+
+ pthread_mutex_lock (&m->mtx);
+
+ /* Process any pending cancellation requests */
+ do_thread_cancel (m);
+
+ /* Post events to ready queue. This must come before the following block
+ * since events should occur immediately */
+ thread_process (&m->event);
+
+ /* If there are no tasks on the ready queue, we will poll() until a timer
+ * expires or we receive I/O, whichever comes first. The strategy for doing
+ * this is:
+ *
+ * - If there are events pending, set the poll() timeout to zero
+ * - If there are no events pending, but there are timers pending, set the
+ * timeout to the smallest remaining time on any timer
+ * - If there are neither timers nor events pending, but there are file
+ * descriptors pending, block indefinitely in poll()
+ * - If nothing is pending, it's time for the application to die
+ *
+ * In every case except the last, we need to hit poll() at least once per
+ * loop to avoid starvation by events */
+
+ if (m->ready.count == 0)
+ tw = thread_timer_wait (m->timer, &tv);
+
+ if (m->ready.count != 0 || (tw && !timercmp (tw, &zerotime, >)))
+ tw = &zerotime;
+
+ if (!tw && m->handler.pfdcount == 0)
+ { /* die */
+ pthread_mutex_unlock (&m->mtx);
+ fetch = NULL;
+ break;
+ }
- do
+ /* Copy pollfd array + # active pollfds in it. Not necessary to copy
+ * the array size as this is fixed. */
+ m->handler.copycount = m->handler.pfdcount;
+ memcpy (m->handler.copy, m->handler.pfds,
+ m->handler.copycount * sizeof (struct pollfd));
+
+ pthread_mutex_unlock (&m->mtx);
{
- int num = 0;
+ num = fd_poll (m, m->handler.copy, m->handler.pfdsize,
+ m->handler.copycount, tw);
+ }
+ pthread_mutex_lock (&m->mtx);
- /* Signals pre-empt everything */
- if (m->handle_signals)
- quagga_sigevent_process ();
-
- pthread_mutex_lock (&m->mtx);
- /* Drain the ready queue of already scheduled jobs, before scheduling
- * more.
- */
- if ((thread = thread_trim_head (&m->ready)) != NULL)
- {
- fetch = thread_run (m, thread, fetch);
- if (fetch->ref)
- *fetch->ref = NULL;
- pthread_mutex_unlock (&m->mtx);
- return fetch;
- }
-
- /* To be fair to all kinds of threads, and avoid starvation, we
- * need to be careful to consider all thread types for scheduling
- * in each quanta. I.e. we should not return early from here on.
- */
-
- /* Normal event are the next highest priority. */
- thread_process (&m->event);
-
- /* Calculate select wait timer if nothing else to do */
- if (m->ready.count == 0)
- {
- timer_wait = thread_timer_wait (m->timer, &timer_val);
- }
+ /* Handle any errors received in poll() */
+ if (num < 0)
+ {
+ if (errno == EINTR)
+ {
+ pthread_mutex_unlock (&m->mtx);
+ continue; /* loop around to signal handler */
+ }
- if (timer_wait && timer_wait->tv_sec < 0)
- {
- timerclear(&timer_val);
- timer_wait = &timer_val;
- }
+ /* else die */
+ zlog_warn ("poll() error: %s", safe_strerror (errno));
+ pthread_mutex_unlock (&m->mtx);
+ fetch = NULL;
+ break;
+ }
- unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp;
- memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd));
+ /* Since we could have received more cancellation requests during poll(), process those */
+ do_thread_cancel (m);
- pthread_mutex_unlock (&m->mtx);
+ /* Post timers to ready queue. */
+ monotime(&now);
+ thread_process_timers (m->timer, &now);
+
+ /* Post I/O to ready queue. */
+ if (num > 0)
+ thread_process_io (m, num);
+
+ /* If we have a ready task, break the loop and return it to the caller */
+ if ((thread = thread_trim_head (&m->ready)))
{
- num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait);
+ fetch = thread_run (m, thread, fetch);
+ if (fetch->ref)
+ *fetch->ref = NULL;
}
- pthread_mutex_lock (&m->mtx);
- /* Signals should get quick treatment */
- if (num < 0)
- {
- if (errno == EINTR)
- {
- pthread_mutex_unlock (&m->mtx);
- continue; /* signal received - process it */
- }
- zlog_warn ("poll() error: %s", safe_strerror (errno));
- pthread_mutex_unlock (&m->mtx);
- return NULL;
- }
+ pthread_mutex_unlock (&m->mtx);
- /* Check foreground timers. Historically, they have had higher
- * priority than I/O threads, so let's push them onto the ready
- * list in front of the I/O threads. */
- monotime(&now);
- thread_process_timers (m->timer, &now);
-
- /* Got IO, process it */
- if (num > 0)
- thread_process_io (m, m->handler.copy, num, count);
-
- if ((thread = thread_trim_head (&m->ready)) != NULL)
- {
- fetch = thread_run (m, thread, fetch);
- if (fetch->ref)
- *fetch->ref = NULL;
- pthread_mutex_unlock (&m->mtx);
- return fetch;
- }
+ } while (!thread && m->spin);
- pthread_mutex_unlock (&m->mtx);
-
- } while (m->spin);
+ return fetch;
+}
- return NULL;
+static unsigned long
+timeval_elapsed (struct timeval a, struct timeval b)
+{
+ return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
+ + (a.tv_usec - b.tv_usec));
}
unsigned long
@@ -1281,8 +1475,6 @@ thread_getrusage (RUSAGE_T *r)
getrusage(RUSAGE_SELF, &(r->cpu));
}
-struct thread *thread_current = NULL;
-
/* We check thread consumed time. If the system has getrusage, we'll
use that to get in-depth stats on the performance of the thread in addition
to wall clock time stats from gettimeofday. */
@@ -1295,9 +1487,9 @@ thread_call (struct thread *thread)
GETRUSAGE (&before);
thread->real = before.real;
- thread_current = thread;
+ pthread_setspecific (thread_current, thread);
(*thread->func) (thread);
- thread_current = NULL;
+ pthread_setspecific (thread_current, NULL);
GETRUSAGE (&after);
@@ -1350,12 +1542,8 @@ funcname_thread_execute (struct thread_master *m,
tmp.func = dummy.func = func;
tmp.funcname = dummy.funcname = funcname;
- pthread_mutex_lock (&cpu_record_mtx);
- {
- dummy.hist = hash_get (cpu_record, &tmp,
- (void * (*) (void *))cpu_record_hash_alloc);
- }
- pthread_mutex_unlock (&cpu_record_mtx);
+ dummy.hist = hash_get (m->cpu_record, &tmp,
+ (void * (*) (void *))cpu_record_hash_alloc);
dummy.schedfrom = schedfrom;
dummy.schedfrom_line = fromln;