summaryrefslogtreecommitdiff
path: root/lib/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/thread.c')
-rw-r--r--lib/thread.c759
1 files changed, 422 insertions, 337 deletions
diff --git a/lib/thread.c b/lib/thread.c
index dbdd91dd24..8c54ec6cea 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -13,10 +13,9 @@
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
- * You should have received a copy of the GNU General Public License
- * along with GNU Zebra; see the file COPYING. If not, write to the Free
- * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/* #define DEBUG */
@@ -31,6 +30,7 @@
#include "pqueue.h"
#include "command.h"
#include "sigevent.h"
+#include "network.h"
DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
@@ -41,7 +41,13 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
#include <mach/mach_time.h>
#endif
-/* Relative time, since startup */
+#define AWAKEN(m) \
+ do { \
+ static unsigned char wakebyte = 0x01; \
+ write (m->io_pipe[1], &wakebyte, 1); \
+ } while (0);
+
+static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct hash *cpu_record = NULL;
static unsigned long
@@ -137,9 +143,14 @@ cpu_record_print(struct vty *vty, thread_type filter)
vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
vty_out(vty, " Avg uSec Max uSecs");
vty_out(vty, " Type Thread%s", VTY_NEWLINE);
- hash_iterate(cpu_record,
- (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
- args);
+
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ hash_iterate(cpu_record,
+ (void(*)(struct hash_backet*,void*))cpu_record_hash_print,
+ args);
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
if (tmp.total_calls > 0)
vty_out_cpu_thread_history(vty, &tmp);
@@ -216,16 +227,25 @@ cpu_record_hash_clear (struct hash_backet *bucket,
if ( !(a->types & *filter) )
return;
- hash_release (cpu_record, bucket->data);
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ hash_release (cpu_record, bucket->data);
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
}
static void
cpu_record_clear (thread_type filter)
{
thread_type *tmp = &filter;
- hash_iterate (cpu_record,
- (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
- tmp);
+
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ hash_iterate (cpu_record,
+ (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
+ tmp);
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
}
DEFUN (clear_thread_cpu,
@@ -326,16 +346,20 @@ thread_master_create (void)
getrlimit(RLIMIT_NOFILE, &limit);
- if (cpu_record == NULL)
- cpu_record
- = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
- (int (*) (const void *, const void *))cpu_record_hash_cmp);
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ if (cpu_record == NULL)
+ cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
+ (int (*) (const void *, const void *))
+ cpu_record_hash_cmp);
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
if (rv == NULL)
- {
- return NULL;
- }
+ return NULL;
+
+ pthread_mutex_init (&rv->mtx, NULL);
rv->fd_limit = (int)limit.rlim_cur;
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
@@ -358,13 +382,20 @@ thread_master_create (void)
rv->background = pqueue_create();
rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
rv->timer->update = rv->background->update = thread_timer_update;
+ rv->spin = true;
+ rv->handle_signals = true;
+ rv->owner = pthread_self();
+ pipe (rv->io_pipe);
+ set_nonblocking (rv->io_pipe[0]);
+ set_nonblocking (rv->io_pipe[1]);
-#if defined(HAVE_POLL_CALL)
rv->handler.pfdsize = rv->fd_limit;
rv->handler.pfdcount = 0;
rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
sizeof (struct pollfd) * rv->handler.pfdsize);
-#endif
+ rv->handler.copy = XCALLOC (MTYPE_THREAD_MASTER,
+ sizeof (struct pollfd) * rv->handler.pfdsize);
+
return rv;
}
@@ -399,18 +430,6 @@ thread_list_delete (struct thread_list *list, struct thread *thread)
return thread;
}
-static void
-thread_delete_fd (struct thread **thread_array, struct thread *thread)
-{
- thread_array[thread->u.fd] = NULL;
-}
-
-static void
-thread_add_fd (struct thread **thread_array, struct thread *thread)
-{
- thread_array[thread->u.fd] = thread;
-}
-
/* Thread list is empty or not. */
static int
thread_empty (struct thread_list *list)
@@ -434,6 +453,7 @@ thread_add_unuse (struct thread_master *m, struct thread *thread)
assert (m != NULL && thread != NULL);
assert (thread->next == NULL);
assert (thread->prev == NULL);
+ thread->ref = NULL;
thread->type = THREAD_UNUSED;
thread->hist->total_active--;
@@ -498,11 +518,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue)
void
thread_master_free_unused (struct thread_master *m)
{
- struct thread *t;
- while ((t = thread_trim_head(&m->unuse)) != NULL)
- {
- XFREE(MTYPE_THREAD, t);
- }
+ pthread_mutex_lock (&m->mtx);
+ {
+ struct thread *t;
+ while ((t = thread_trim_head(&m->unuse)) != NULL)
+ {
+ pthread_mutex_destroy (&t->mtx);
+ XFREE(MTYPE_THREAD, t);
+ }
+ }
+ pthread_mutex_unlock (&m->mtx);
}
/* Stop thread scheduler. */
@@ -516,25 +541,38 @@ thread_master_free (struct thread_master *m)
thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse);
thread_queue_free (m, m->background);
+ pthread_mutex_destroy (&m->mtx);
+ close (m->io_pipe[0]);
+ close (m->io_pipe[1]);
-#if defined(HAVE_POLL_CALL)
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
-#endif
+ XFREE (MTYPE_THREAD_MASTER, m->handler.copy);
XFREE (MTYPE_THREAD_MASTER, m);
- if (cpu_record)
- {
- hash_clean (cpu_record, cpu_record_hash_free);
- hash_free (cpu_record);
- cpu_record = NULL;
- }
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ if (cpu_record)
+ {
+ hash_clean (cpu_record, cpu_record_hash_free);
+ hash_free (cpu_record);
+ cpu_record = NULL;
+ }
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
}
/* Return remain time in second. */
unsigned long
thread_timer_remain_second (struct thread *thread)
{
- int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
+ int64_t remain;
+
+ pthread_mutex_lock (&thread->mtx);
+ {
+ remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
+ }
+ pthread_mutex_unlock (&thread->mtx);
+
return remain < 0 ? 0 : remain;
}
@@ -545,7 +583,11 @@ struct timeval
thread_timer_remain(struct thread *thread)
{
struct timeval remain;
- monotime_until(&thread->u.sands, &remain);
+ pthread_mutex_lock (&thread->mtx);
+ {
+ monotime_until(&thread->u.sands, &remain);
+ }
+ pthread_mutex_unlock (&thread->mtx);
return remain;
}
@@ -560,14 +602,18 @@ thread_get (struct thread_master *m, u_char type,
if (! thread)
{
thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
+ /* mutex only needs to be initialized at struct creation. */
+ pthread_mutex_init (&thread->mtx, NULL);
m->alloc++;
}
+
thread->type = type;
thread->add_type = type;
thread->master = m;
thread->arg = arg;
thread->index = -1;
thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
+ thread->ref = NULL;
/*
* So if the passed in funcname is not what we have
@@ -584,8 +630,12 @@ thread_get (struct thread_master *m, u_char type,
{
tmp.func = func;
tmp.funcname = funcname;
- thread->hist = hash_get (cpu_record, &tmp,
- (void * (*) (void *))cpu_record_hash_alloc);
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ thread->hist = hash_get (cpu_record, &tmp,
+ (void * (*) (void *))cpu_record_hash_alloc);
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
}
thread->hist->total_active++;
thread->func = func;
@@ -596,154 +646,111 @@ thread_get (struct thread_master *m, u_char type,
return thread;
}
-#if defined (HAVE_POLL_CALL)
-
-#define fd_copy_fd_set(X) (X)
-
-/* generic add thread function */
-static struct thread *
-generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
- void *arg, int fd, int dir, debugargdef)
-{
- struct thread *thread;
-
- u_char type;
- short int event;
-
- if (dir == THREAD_READ)
- {
- event = (POLLIN | POLLHUP);
- type = THREAD_READ;
- }
- else
- {
- event = (POLLOUT | POLLHUP);
- type = THREAD_WRITE;
- }
-
- nfds_t queuepos = m->handler.pfdcount;
- nfds_t i=0;
- for (i=0; i<m->handler.pfdcount; i++)
- if (m->handler.pfds[i].fd == fd)
- {
- queuepos = i;
- break;
- }
-
- /* is there enough space for a new fd? */
- assert (queuepos < m->handler.pfdsize);
-
- thread = thread_get (m, type, func, arg, debugargpass);
- m->handler.pfds[queuepos].fd = fd;
- m->handler.pfds[queuepos].events |= event;
- if (queuepos == m->handler.pfdcount)
- m->handler.pfdcount++;
-
- return thread;
-}
-#else
-
-#define fd_copy_fd_set(X) (X)
-#endif
-
static int
-fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
+fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
+ nfds_t count, struct timeval *timer_wait)
{
- int num;
-#if defined(HAVE_POLL_CALL)
- /* recalc timeout for poll. Attention NULL pointer is no timeout with
- select, where with poll no timeount is -1 */
+ /* If timer_wait is null here, that means poll() should block indefinitely,
+ * unless the thread_master has overriden it by setting ->selectpoll_timeout.
+ * If the value is positive, it specifies the maximum number of milliseconds
+ * to wait. If the timeout is -1, it specifies that we should never wait and
+ * always return immediately even if no event is detected. If the value is
+ * zero, the behavior is default. */
int timeout = -1;
- if (timer_wait != NULL)
- timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
- num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
-#else
- num = select (size, read, write, except, timer_wait);
-#endif
+ /* number of file descriptors with events */
+ int num;
- return num;
-}
+ if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value
+ timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
+ else if (m->selectpoll_timeout > 0) // use the user's timeout
+ timeout = m->selectpoll_timeout;
+ else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
+ timeout = 0;
-static int
-fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
-{
-#if defined(HAVE_POLL_CALL)
- return 1;
-#else
- return FD_ISSET (THREAD_FD (thread), fdset);
-#endif
-}
+ /* add poll pipe poker */
+ assert (count + 1 < pfdsize);
+ pfds[count].fd = m->io_pipe[0];
+ pfds[count].events = POLLIN;
+ pfds[count].revents = 0x00;
-static int
-fd_clear_read_write (struct thread *thread)
-{
-#if !defined(HAVE_POLL_CALL)
- thread_fd_set *fdset = NULL;
- int fd = THREAD_FD (thread);
-
- if (thread->type == THREAD_READ)
- fdset = &thread->master->handler.readfd;
- else
- fdset = &thread->master->handler.writefd;
+ num = poll (pfds, count + 1, timeout);
- if (!FD_ISSET (fd, fdset))
- return 0;
+ static unsigned char trash[64];
+ if (num > 0 && pfds[count].revents != 0 && num--)
+ while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0);
- FD_CLR (fd, fdset);
-#endif
- return 1;
+ return num;
}
/* Add new read thread. */
struct thread *
funcname_thread_add_read_write (int dir, struct thread_master *m,
- int (*func) (struct thread *), void *arg, int fd,
- debugargdef)
+ int (*func) (struct thread *), void *arg, int fd, struct thread **t_ptr,
+ debugargdef)
{
struct thread *thread = NULL;
-#if !defined(HAVE_POLL_CALL)
- thread_fd_set *fdset = NULL;
- if (dir == THREAD_READ)
- fdset = &m->handler.readfd;
- else
- fdset = &m->handler.writefd;
-#endif
+ pthread_mutex_lock (&m->mtx);
+ {
+ if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
+ {
+ pthread_mutex_unlock (&m->mtx);
+ return NULL;
+ }
-#if defined (HAVE_POLL_CALL)
- thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
+ /* default to a new pollfd */
+ nfds_t queuepos = m->handler.pfdcount;
- if (thread == NULL)
- return NULL;
-#else
- if (FD_ISSET (fd, fdset))
- {
- zlog_warn ("There is already %s fd [%d]",
- (dir == THREAD_READ) ? "read" : "write", fd);
- return NULL;
- }
+ /* if we already have a pollfd for our file descriptor, find and use it */
+ for (nfds_t i = 0; i < m->handler.pfdcount; i++)
+ if (m->handler.pfds[i].fd == fd)
+ {
+ queuepos = i;
+ break;
+ }
- FD_SET (fd, fdset);
- thread = thread_get (m, dir, func, arg, debugargpass);
-#endif
+ /* make sure we have room for this fd + pipe poker fd */
+ assert (queuepos + 1 < m->handler.pfdsize);
- thread->u.fd = fd;
- if (dir == THREAD_READ)
- thread_add_fd (m->read, thread);
- else
- thread_add_fd (m->write, thread);
+ thread = thread_get (m, dir, func, arg, debugargpass);
+
+ m->handler.pfds[queuepos].fd = fd;
+ m->handler.pfds[queuepos].events |= (dir == THREAD_READ ? POLLIN : POLLOUT);
+
+ if (queuepos == m->handler.pfdcount)
+ m->handler.pfdcount++;
+
+ if (thread)
+ {
+ pthread_mutex_lock (&thread->mtx);
+ {
+ thread->u.fd = fd;
+ if (dir == THREAD_READ)
+ m->read[thread->u.fd] = thread;
+ else
+ m->write[thread->u.fd] = thread;
+ }
+ pthread_mutex_unlock (&thread->mtx);
+
+ if (t_ptr)
+ {
+ *t_ptr = thread;
+ thread->ref = t_ptr;
+ }
+ }
+
+ AWAKEN (m);
+ }
+ pthread_mutex_unlock (&m->mtx);
return thread;
}
static struct thread *
funcname_thread_add_timer_timeval (struct thread_master *m,
- int (*func) (struct thread *),
- int type,
- void *arg,
- struct timeval *time_relative,
- debugargdef)
+ int (*func) (struct thread *), int type, void *arg,
+ struct timeval *time_relative, struct thread **t_ptr, debugargdef)
{
struct thread *thread;
struct pqueue *queue;
@@ -753,13 +760,34 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
assert (time_relative);
- queue = ((type == THREAD_TIMER) ? m->timer : m->background);
- thread = thread_get (m, type, func, arg, debugargpass);
+ pthread_mutex_lock (&m->mtx);
+ {
+ if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
+ {
+ pthread_mutex_unlock (&m->mtx);
+ return NULL;
+ }
+
+ queue = ((type == THREAD_TIMER) ? m->timer : m->background);
+ thread = thread_get (m, type, func, arg, debugargpass);
+
+ pthread_mutex_lock (&thread->mtx);
+ {
+ monotime(&thread->u.sands);
+ timeradd(&thread->u.sands, time_relative, &thread->u.sands);
+ pqueue_enqueue(thread, queue);
+ if (t_ptr)
+ {
+ *t_ptr = thread;
+ thread->ref = t_ptr;
+ }
+ }
+ pthread_mutex_unlock (&thread->mtx);
- monotime(&thread->u.sands);
- timeradd(&thread->u.sands, time_relative, &thread->u.sands);
+ AWAKEN (m);
+ }
+ pthread_mutex_unlock (&m->mtx);
- pqueue_enqueue(thread, queue);
return thread;
}
@@ -767,9 +795,8 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
/* Add timer event thread. */
struct thread *
funcname_thread_add_timer (struct thread_master *m,
- int (*func) (struct thread *),
- void *arg, long timer,
- debugargdef)
+ int (*func) (struct thread *), void *arg, long timer,
+ struct thread **t_ptr, debugargdef)
{
struct timeval trel;
@@ -778,16 +805,15 @@ funcname_thread_add_timer (struct thread_master *m,
trel.tv_sec = timer;
trel.tv_usec = 0;
- return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg,
- &trel, debugargpass);
+ return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel,
+ t_ptr, debugargpass);
}
/* Add timer event thread with "millisecond" resolution */
struct thread *
funcname_thread_add_timer_msec (struct thread_master *m,
- int (*func) (struct thread *),
- void *arg, long timer,
- debugargdef)
+ int (*func) (struct thread *), void *arg, long timer,
+ struct thread **t_ptr, debugargdef)
{
struct timeval trel;
@@ -796,27 +822,25 @@ funcname_thread_add_timer_msec (struct thread_master *m,
trel.tv_sec = timer / 1000;
trel.tv_usec = 1000*(timer % 1000);
- return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
- arg, &trel, debugargpass);
+ return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, &trel,
+ t_ptr, debugargpass);
}
/* Add timer event thread with "millisecond" resolution */
struct thread *
funcname_thread_add_timer_tv (struct thread_master *m,
- int (*func) (struct thread *),
- void *arg, struct timeval *tv,
- debugargdef)
+ int (*func) (struct thread *), void *arg, struct timeval *tv,
+ struct thread **t_ptr, debugargdef)
{
- return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
- arg, tv, debugargpass);
+ return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg, tv,
+ t_ptr, debugargpass);
}
/* Add a background thread, with an optional millisec delay */
struct thread *
funcname_thread_add_background (struct thread_master *m,
- int (*func) (struct thread *),
- void *arg, long delay,
- debugargdef)
+ int (*func) (struct thread *), void *arg, long delay,
+ struct thread **t_ptr, debugargdef)
{
struct timeval trel;
@@ -833,23 +857,45 @@ funcname_thread_add_background (struct thread_master *m,
trel.tv_usec = 0;
}
- return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
- arg, &trel, debugargpass);
+ return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, arg, &trel,
+ t_ptr, debugargpass);
}
/* Add simple event thread. */
struct thread *
funcname_thread_add_event (struct thread_master *m,
- int (*func) (struct thread *), void *arg, int val,
- debugargdef)
+ int (*func) (struct thread *), void *arg, int val,
+ struct thread **t_ptr, debugargdef)
{
struct thread *thread;
assert (m != NULL);
- thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
- thread->u.val = val;
- thread_list_add (&m->event, thread);
+ pthread_mutex_lock (&m->mtx);
+ {
+ if (t_ptr && *t_ptr) // thread is already scheduled; don't reschedule
+ {
+ pthread_mutex_unlock (&m->mtx);
+ return NULL;
+ }
+
+ thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
+ pthread_mutex_lock (&thread->mtx);
+ {
+ thread->u.val = val;
+ thread_list_add (&m->event, thread);
+ }
+ pthread_mutex_unlock (&thread->mtx);
+
+ if (t_ptr)
+ {
+ *t_ptr = thread;
+ thread->ref = t_ptr;
+ }
+
+ AWAKEN (m);
+ }
+ pthread_mutex_unlock (&m->mtx);
return thread;
}
@@ -857,10 +903,7 @@ funcname_thread_add_event (struct thread_master *m,
static void
thread_cancel_read_or_write (struct thread *thread, short int state)
{
-#if defined(HAVE_POLL_CALL)
- nfds_t i;
-
- for (i=0;i<thread->master->handler.pfdcount;++i)
+ for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i)
if (thread->master->handler.pfds[i].fd == thread->u.fd)
{
thread->master->handler.pfds[i].events &= ~(state);
@@ -875,35 +918,34 @@ thread_cancel_read_or_write (struct thread *thread, short int state)
return;
}
}
-#endif
-
- fd_clear_read_write (thread);
}
-/* Cancel thread from scheduler. */
+/**
+ * Cancel thread from scheduler.
+ *
+ * This function is *NOT* MT-safe. DO NOT call it from any other pthread except
+ * the one which owns thread->master. You will crash.
+ */
void
thread_cancel (struct thread *thread)
{
struct thread_list *list = NULL;
struct pqueue *queue = NULL;
struct thread **thread_array = NULL;
-
+
+ pthread_mutex_lock (&thread->mtx);
+ pthread_mutex_lock (&thread->master->mtx);
+
+ assert (pthread_self() == thread->master->owner);
+
switch (thread->type)
{
case THREAD_READ:
-#if defined (HAVE_POLL_CALL)
thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
-#else
- thread_cancel_read_or_write (thread, 0);
-#endif
thread_array = thread->master->read;
break;
case THREAD_WRITE:
-#if defined (HAVE_POLL_CALL)
thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
-#else
- thread_cancel_read_or_write (thread, 0);
-#endif
thread_array = thread->master->write;
break;
case THREAD_TIMER:
@@ -919,15 +961,14 @@ thread_cancel (struct thread *thread)
queue = thread->master->background;
break;
default:
- return;
+ goto done;
break;
}
if (queue)
{
assert(thread->index >= 0);
- assert(thread == queue->array[thread->index]);
- pqueue_remove_at(thread->index, queue);
+ pqueue_remove (thread, queue);
}
else if (list)
{
@@ -935,14 +976,21 @@ thread_cancel (struct thread *thread)
}
else if (thread_array)
{
- thread_delete_fd (thread_array, thread);
+ thread_array[thread->u.fd] = NULL;
}
else
{
assert(!"Thread should be either in queue or list or array!");
}
+ if (thread->ref)
+ *thread->ref = NULL;
+
thread_add_unuse (thread->master, thread);
+
+done:
+ pthread_mutex_unlock (&thread->master->mtx);
+ pthread_mutex_unlock (&thread->mtx);
}
/* Delete all events which has argument value arg. */
@@ -951,39 +999,52 @@ thread_cancel_event (struct thread_master *m, void *arg)
{
unsigned int ret = 0;
struct thread *thread;
+ struct thread *t;
- thread = m->event.head;
- while (thread)
- {
- struct thread *t;
-
- t = thread;
- thread = t->next;
-
- if (t->arg == arg)
+ pthread_mutex_lock (&m->mtx);
+ {
+ thread = m->event.head;
+ while (thread)
+ {
+ t = thread;
+ pthread_mutex_lock (&t->mtx);
{
- ret++;
- thread_list_delete (&m->event, t);
- thread_add_unuse (m, t);
+ thread = t->next;
+
+ if (t->arg == arg)
+ {
+ ret++;
+ thread_list_delete (&m->event, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse (m, t);
+ }
}
- }
-
- /* thread can be on the ready list too */
- thread = m->ready.head;
- while (thread)
- {
- struct thread *t;
-
- t = thread;
- thread = t->next;
+ pthread_mutex_unlock (&t->mtx);
+ }
- if (t->arg == arg)
+ /* thread can be on the ready list too */
+ thread = m->ready.head;
+ while (thread)
+ {
+ t = thread;
+ pthread_mutex_lock (&t->mtx);
{
- ret++;
- thread_list_delete (&m->ready, t);
- thread_add_unuse (m, t);
+ thread = t->next;
+
+ if (t->arg == arg)
+ {
+ ret++;
+ thread_list_delete (&m->ready, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse (m, t);
+ }
}
- }
+ pthread_mutex_unlock (&t->mtx);
+ }
+ }
+ pthread_mutex_unlock (&m->mtx);
return ret;
}
@@ -1001,7 +1062,7 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
static struct thread *
thread_run (struct thread_master *m, struct thread *thread,
- struct thread *fetch)
+ struct thread *fetch)
{
*fetch = *thread;
thread_add_unuse (m, thread);
@@ -1009,7 +1070,8 @@ thread_run (struct thread_master *m, struct thread *thread,
}
static int
-thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
+thread_process_io_helper (struct thread_master *m, struct thread *thread,
+ short state, int pos)
{
struct thread **thread_array;
@@ -1021,76 +1083,60 @@ thread_process_fds_helper (struct thread_master *m, struct thread *thread, threa
else
thread_array = m->write;
- if (fd_is_set (thread, fdset, pos))
- {
- fd_clear_read_write (thread);
- thread_delete_fd (thread_array, thread);
- thread_list_add (&m->ready, thread);
- thread->type = THREAD_READY;
-#if defined(HAVE_POLL_CALL)
- thread->master->handler.pfds[pos].events &= ~(state);
-#endif
- return 1;
- }
- return 0;
+ thread_array[thread->u.fd] = NULL;
+ thread_list_add (&m->ready, thread);
+ thread->type = THREAD_READY;
+ /* if another pthread scheduled this file descriptor for the event we're
+ * responding to, no problem; we're getting to it now */
+ thread->master->handler.pfds[pos].events &= ~(state);
+ return 1;
}
-#if defined(HAVE_POLL_CALL)
-
-/* check poll events */
static void
-check_pollfds(struct thread_master *m, fd_set *readfd, int num)
+thread_process_io (struct thread_master *m, struct pollfd *pfds,
+ unsigned int num, unsigned int count)
{
- nfds_t i = 0;
- int ready = 0;
- for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
+ unsigned int ready = 0;
+
+ for (nfds_t i = 0; i < count && ready < num ; ++i)
{
- /* no event for current fd? immideatly continue */
- if(m->handler.pfds[i].revents == 0)
+ /* no event for current fd? immediately continue */
+ if (pfds[i].revents == 0)
continue;
ready++;
- /* POLLIN / POLLOUT process event */
- if (m->handler.pfds[i].revents & (POLLIN | POLLHUP))
- thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
- if (m->handler.pfds[i].revents & POLLOUT)
- thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
-
- /* remove fd from list on POLLNVAL */
- if (m->handler.pfds[i].revents & POLLNVAL)
+ /* Unless someone has called thread_cancel from another pthread, the only
+ * thing that could have changed in m->handler.pfds while we were
+ * asleep is the .events field in a given pollfd. Barring thread_cancel()
+ * that value should be a superset of the values we have in our copy, so
+ * there's no need to update it. Similarily, barring deletion, the fd
+ * should still be a valid index into the master's pfds. */
+ if (pfds[i].revents & (POLLIN | POLLHUP))
+ thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, i);
+ if (pfds[i].revents & POLLOUT)
+ thread_process_io_helper(m, m->write[pfds[i].fd], POLLOUT, i);
+
+ /* if one of our file descriptors is garbage, remove the same from
+ * both pfds + update sizes and index */
+ if (pfds[i].revents & POLLNVAL)
{
- memmove(m->handler.pfds+i,
- m->handler.pfds+i+1,
- (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
- m->handler.pfdcount--;
- i--;
+ memmove (m->handler.pfds + i,
+ m->handler.pfds + i + 1,
+ (m->handler.pfdcount - i - 1) * sizeof(struct pollfd));
+ m->handler.pfdcount--;
+
+ memmove (pfds + i, pfds + i + 1,
+ (count - i - 1) * sizeof(struct pollfd));
+ count--;
+ i--;
}
- else
- m->handler.pfds[i].revents = 0;
- }
-}
-#endif
-
-static void
-thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
-{
-#if defined (HAVE_POLL_CALL)
- check_pollfds (m, rset, num);
-#else
- int ready = 0, index;
-
- for (index = 0; index < m->fd_limit && ready < num; ++index)
- {
- ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
- ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
}
-#endif
}
/* Add all timers that have popped to the ready list. */
static unsigned int
-thread_timer_process (struct pqueue *queue, struct timeval *timenow)
+thread_process_timers (struct pqueue *queue, struct timeval *timenow)
{
struct thread *thread;
unsigned int ready = 0;
@@ -1133,27 +1179,32 @@ struct thread *
thread_fetch (struct thread_master *m, struct thread *fetch)
{
struct thread *thread;
- thread_fd_set readfd;
- thread_fd_set writefd;
- thread_fd_set exceptfd;
struct timeval now;
struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
struct timeval timer_val_bg;
struct timeval *timer_wait = &timer_val;
struct timeval *timer_wait_bg;
- while (1)
+ do
{
int num = 0;
/* Signals pre-empt everything */
- quagga_sigevent_process ();
+ if (m->handle_signals)
+ quagga_sigevent_process ();
+ pthread_mutex_lock (&m->mtx);
/* Drain the ready queue of already scheduled jobs, before scheduling
* more.
*/
if ((thread = thread_trim_head (&m->ready)) != NULL)
- return thread_run (m, thread, fetch);
+ {
+ fetch = thread_run (m, thread, fetch);
+ if (fetch->ref)
+ *fetch->ref = NULL;
+ pthread_mutex_unlock (&m->mtx);
+ return fetch;
+ }
/* To be fair to all kinds of threads, and avoid starvation, we
* need to be careful to consider all thread types for scheduling
@@ -1163,13 +1214,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
/* Normal event are the next highest priority. */
thread_process (&m->event);
- /* Structure copy. */
-#if !defined(HAVE_POLL_CALL)
- readfd = fd_copy_fd_set(m->handler.readfd);
- writefd = fd_copy_fd_set(m->handler.writefd);
- exceptfd = fd_copy_fd_set(m->handler.exceptfd);
-#endif
-
/* Calculate select wait timer if nothing else to do */
if (m->ready.count == 0)
{
@@ -1187,26 +1231,37 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
timer_wait = &timer_val;
}
- num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
-
+ unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp;
+ memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd));
+
+ pthread_mutex_unlock (&m->mtx);
+ {
+ num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait);
+ }
+ pthread_mutex_lock (&m->mtx);
+
/* Signals should get quick treatment */
if (num < 0)
{
if (errno == EINTR)
- continue; /* signal received - process it */
- zlog_warn ("select() error: %s", safe_strerror (errno));
+ {
+ pthread_mutex_unlock (&m->mtx);
+ continue; /* signal received - process it */
+ }
+ zlog_warn ("poll() error: %s", safe_strerror (errno));
+ pthread_mutex_unlock (&m->mtx);
return NULL;
}
/* Check foreground timers. Historically, they have had higher
- priority than I/O threads, so let's push them onto the ready
- list in front of the I/O threads. */
+ * priority than I/O threads, so let's push them onto the ready
+ * list in front of the I/O threads. */
monotime(&now);
- thread_timer_process (m->timer, &now);
+ thread_process_timers (m->timer, &now);
/* Got IO, process it */
if (num > 0)
- thread_process_fds (m, &readfd, &writefd, num);
+ thread_process_io (m, m->handler.copy, num, count);
#if 0
/* If any threads were made ready above (I/O or foreground timer),
@@ -1214,15 +1269,32 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
list at this time. If this is code is uncommented, then background
timer threads will not run unless there is nothing else to do. */
if ((thread = thread_trim_head (&m->ready)) != NULL)
- return thread_run (m, thread, fetch);
+ {
+ fetch = thread_run (m, thread, fetch);
+ if (fetch->ref)
+ *fetch->ref = NULL;
+ pthread_mutex_unlock (&m->mtx);
+ return fetch;
+ }
#endif
/* Background timer/events, lowest priority */
- thread_timer_process (m->background, &now);
+ thread_process_timers (m->background, &now);
if ((thread = thread_trim_head (&m->ready)) != NULL)
- return thread_run (m, thread, fetch);
- }
+ {
+ fetch = thread_run (m, thread, fetch);
+ if (fetch->ref)
+ *fetch->ref = NULL;
+ pthread_mutex_unlock (&m->mtx);
+ return fetch;
+ }
+
+ pthread_mutex_unlock (&m->mtx);
+
+ } while (m->spin);
+
+ return NULL;
}
unsigned long
@@ -1247,13 +1319,23 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
int
thread_should_yield (struct thread *thread)
{
- return monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
+ int result;
+ pthread_mutex_lock (&thread->mtx);
+ {
+ result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
+ }
+ pthread_mutex_unlock (&thread->mtx);
+ return result;
}
void
thread_set_yield_time (struct thread *thread, unsigned long yield_time)
{
- thread->yield = yield_time;
+ pthread_mutex_lock (&thread->mtx);
+ {
+ thread->yield = yield_time;
+ }
+ pthread_mutex_unlock (&thread->mtx);
}
void
@@ -1311,7 +1393,7 @@ thread_call (struct thread *thread)
}
/* Execute thread */
-struct thread *
+void
funcname_thread_execute (struct thread_master *m,
int (*func)(struct thread *),
void *arg,
@@ -1323,6 +1405,7 @@ funcname_thread_execute (struct thread_master *m,
memset (&dummy, 0, sizeof (struct thread));
+ pthread_mutex_init (&dummy.mtx, NULL);
dummy.type = THREAD_EVENT;
dummy.add_type = THREAD_EXECUTE;
dummy.master = NULL;
@@ -1331,13 +1414,15 @@ funcname_thread_execute (struct thread_master *m,
tmp.func = dummy.func = func;
tmp.funcname = dummy.funcname = funcname;
- dummy.hist = hash_get (cpu_record, &tmp,
- (void * (*) (void *))cpu_record_hash_alloc);
+ pthread_mutex_lock (&cpu_record_mtx);
+ {
+ dummy.hist = hash_get (cpu_record, &tmp,
+ (void * (*) (void *))cpu_record_hash_alloc);
+ }
+ pthread_mutex_unlock (&cpu_record_mtx);
dummy.schedfrom = schedfrom;
dummy.schedfrom_line = fromln;
thread_call (&dummy);
-
- return NULL;
}