diff options
Diffstat (limited to 'lib/thread.c')
| -rw-r--r-- | lib/thread.c | 442 |
1 files changed, 127 insertions, 315 deletions
diff --git a/lib/thread.c b/lib/thread.c index ccb635a87d..bf3500fd8b 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -30,6 +30,7 @@ #include "pqueue.h" #include "command.h" #include "sigevent.h" +#include "network.h" DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread") DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master") @@ -40,6 +41,12 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include <mach/mach_time.h> #endif +#define AWAKEN(m) \ + do { \ + static unsigned char wakebyte = 0x01; \ + write (m->io_pipe[1], &wakebyte, 1); \ + } while (0); + static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; @@ -89,13 +96,12 @@ vty_out_cpu_thread_history(struct vty* vty, a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls, a->cpu.total/a->total_calls, a->cpu.max, a->real.total/a->total_calls, a->real.max); - vty_out(vty, " %c%c%c%c%c%c %s%s", + vty_out(vty, " %c%c%c%c%c %s%s", a->types & (1 << THREAD_READ) ? 'R':' ', a->types & (1 << THREAD_WRITE) ? 'W':' ', a->types & (1 << THREAD_TIMER) ? 'T':' ', a->types & (1 << THREAD_EVENT) ? 'E':' ', a->types & (1 << THREAD_EXECUTE) ? 'X':' ', - a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ', a->funcname, VTY_NEWLINE); } @@ -188,10 +194,6 @@ DEFUN (show_thread_cpu, case 'X': filter |= (1 << THREAD_EXECUTE); break; - case 'b': - case 'B': - filter |= (1 << THREAD_BACKGROUND); - break; default: break; } @@ -280,10 +282,6 @@ DEFUN (clear_thread_cpu, case 'X': filter |= (1 << THREAD_EXECUTE); break; - case 'b': - case 'B': - filter |= (1 << THREAD_BACKGROUND); - break; default: break; } @@ -372,19 +370,22 @@ thread_master_create (void) /* Initialize the timer queues */ rv->timer = pqueue_create(); - rv->background = pqueue_create(); - rv->timer->cmp = rv->background->cmp = thread_timer_cmp; - rv->timer->update = rv->background->update = thread_timer_update; + rv->timer->cmp = thread_timer_cmp; + rv->timer->update = thread_timer_update; rv->spin = true; rv->handle_signals = true; rv->owner = pthread_self(); + pipe (rv->io_pipe); + set_nonblocking (rv->io_pipe[0]); + set_nonblocking (rv->io_pipe[1]); -#if defined(HAVE_POLL_CALL) rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct pollfd) * rv->handler.pfdsize); -#endif + rv->handler.copy = XCALLOC (MTYPE_THREAD_MASTER, + sizeof (struct pollfd) * rv->handler.pfdsize); + return rv; } @@ -419,18 +420,6 @@ thread_list_delete (struct thread_list *list, struct thread *thread) return thread; } -static void -thread_delete_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = NULL; -} - -static void -thread_add_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = thread; -} - /* Thread list is empty or not. */ static int thread_empty (struct thread_list *list) @@ -541,12 +530,12 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->event); thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); - thread_queue_free (m, m->background); pthread_mutex_destroy (&m->mtx); + close (m->io_pipe[0]); + close (m->io_pipe[1]); -#if defined(HAVE_POLL_CALL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); -#endif + XFREE (MTYPE_THREAD_MASTER, m->handler.copy); XFREE (MTYPE_THREAD_MASTER, m); pthread_mutex_lock (&cpu_record_mtx); @@ -646,77 +635,21 @@ thread_get (struct thread_master *m, u_char type, return thread; } -#if defined (HAVE_POLL_CALL) - -#define fd_copy_fd_set(X) (X) - -/* generic add thread function */ -static struct thread * -generic_thread_add(struct thread_master *m, int (*func) (struct thread *), - void *arg, int fd, int dir, debugargdef) -{ - struct thread *thread; - - u_char type; - short int event; - - if (dir == THREAD_READ) - { - event = (POLLIN | POLLHUP); - type = THREAD_READ; - } - else - { - event = (POLLOUT | POLLHUP); - type = THREAD_WRITE; - } - - nfds_t queuepos = m->handler.pfdcount; - nfds_t i=0; - for (i=0; i<m->handler.pfdcount; i++) - if (m->handler.pfds[i].fd == fd) - { - queuepos = i; - break; - } - - /* is there enough space for a new fd? */ - assert (queuepos < m->handler.pfdsize); - - thread = thread_get (m, type, func, arg, debugargpass); - m->handler.pfds[queuepos].fd = fd; - m->handler.pfds[queuepos].events |= event; - if (queuepos == m->handler.pfdcount) - m->handler.pfdcount++; - - return thread; -} -#else - -#define fd_copy_fd_set(X) (X) -#endif - static int -fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) +fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, + nfds_t count, struct timeval *timer_wait) { - int num; - - /* If timer_wait is null here, that means either select() or poll() should - * block indefinitely, unless the thread_master has overriden it. select() - * and poll() differ in the timeout values they interpret as an indefinite - * block; select() requires a null pointer, while poll takes a millisecond - * value of -1. - * - * The thread_master owner has the option of overriding the default behavior - * by setting ->selectpoll_timeout. If the value is positive, it specifies - * the maximum number of milliseconds to wait. If the timeout is -1, it - * specifies that we should never wait and always return immediately even if - * no event is detected. If the value is zero, the behavior is default. - */ - -#if defined(HAVE_POLL_CALL) + /* If timer_wait is null here, that means poll() should block indefinitely, + * unless the thread_master has overriden it by setting ->selectpoll_timeout. + * If the value is positive, it specifies the maximum number of milliseconds + * to wait. If the timeout is -1, it specifies that we should never wait and + * always return immediately even if no event is detected. If the value is + * zero, the behavior is default. */ int timeout = -1; + /* number of file descriptors with events */ + int num; + if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); else if (m->selectpoll_timeout > 0) // use the user's timeout @@ -724,56 +657,19 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) timeout = 0; - num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); -#else - struct timeval timeout; + /* add poll pipe poker */ + assert (count + 1 < pfdsize); + pfds[count].fd = m->io_pipe[0]; + pfds[count].events = POLLIN; + pfds[count].revents = 0x00; - if (m->selectpoll_timeout > 0) // use the user's timeout - { - timeout.tv_sec = m->selectpoll_timeout / 1000; - timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000; - timer_wait = &timeout; - } - else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) - { - timeout.tv_sec = 0; - timeout.tv_usec = 0; - timer_wait = &timeout; - } - num = select (size, read, write, except, timer_wait); -#endif + num = poll (pfds, count + 1, timeout); - return num; -} + static unsigned char trash[64]; + if (num > 0 && pfds[count].revents != 0 && num--) + while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0); -static int -fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos) -{ -#if defined(HAVE_POLL_CALL) - return 1; -#else - return FD_ISSET (THREAD_FD (thread), fdset); -#endif -} - -static int -fd_clear_read_write (struct thread *thread) -{ -#if !defined(HAVE_POLL_CALL) - thread_fd_set *fdset = NULL; - int fd = THREAD_FD (thread); - - if (thread->type == THREAD_READ) - fdset = &thread->master->handler.readfd; - else - fdset = &thread->master->handler.writefd; - - if (!FD_ISSET (fd, fdset)) - return 0; - - FD_CLR (fd, fdset); -#endif - return 1; + return num; } /* Add new read thread. */ @@ -792,32 +688,27 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, return NULL; } -#if defined (HAVE_POLL_CALL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); -#else - if (fd >= FD_SETSIZE) - { - zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile" - "with --enable-poll=yes", fd, FD_SETSIZE); - assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE"); - } - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; + /* default to a new pollfd */ + nfds_t queuepos = m->handler.pfdcount; - if (FD_ISSET (fd, fdset)) - { - zlog_warn ("There is already %s fd [%d]", - (dir == THREAD_READ) ? "read" : "write", fd); - } - else - { - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); - } -#endif + /* if we already have a pollfd for our file descriptor, find and use it */ + for (nfds_t i = 0; i < m->handler.pfdcount; i++) + if (m->handler.pfds[i].fd == fd) + { + queuepos = i; + break; + } + + /* make sure we have room for this fd + pipe poker fd */ + assert (queuepos + 1 < m->handler.pfdsize); + + thread = thread_get (m, dir, func, arg, debugargpass); + + m->handler.pfds[queuepos].fd = fd; + m->handler.pfds[queuepos].events |= (dir == THREAD_READ ? POLLIN : POLLOUT); + + if (queuepos == m->handler.pfdcount) + m->handler.pfdcount++; if (thread) { @@ -825,9 +716,9 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { thread->u.fd = fd; if (dir == THREAD_READ) - thread_add_fd (m->read, thread); + m->read[thread->u.fd] = thread; else - thread_add_fd (m->write, thread); + m->write[thread->u.fd] = thread; } pthread_mutex_unlock (&thread->mtx); @@ -837,6 +728,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, thread->ref = t_ptr; } } + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -853,7 +746,7 @@ funcname_thread_add_timer_timeval (struct thread_master *m, assert (m != NULL); - assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); + assert (type == THREAD_TIMER); assert (time_relative); pthread_mutex_lock (&m->mtx); @@ -864,7 +757,7 @@ funcname_thread_add_timer_timeval (struct thread_master *m, return NULL; } - queue = ((type == THREAD_TIMER) ? m->timer : m->background); + queue = m->timer; thread = thread_get (m, type, func, arg, debugargpass); pthread_mutex_lock (&thread->mtx); @@ -879,6 +772,8 @@ funcname_thread_add_timer_timeval (struct thread_master *m, } } pthread_mutex_unlock (&thread->mtx); + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -930,31 +825,6 @@ funcname_thread_add_timer_tv (struct thread_master *m, t_ptr, debugargpass); } -/* Add a background thread, with an optional millisec delay */ -struct thread * -funcname_thread_add_background (struct thread_master *m, - int (*func) (struct thread *), void *arg, long delay, - struct thread **t_ptr, debugargdef) -{ - struct timeval trel; - - assert (m != NULL); - - if (delay) - { - trel.tv_sec = delay / 1000; - trel.tv_usec = 1000*(delay % 1000); - } - else - { - trel.tv_sec = 0; - trel.tv_usec = 0; - } - - return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, arg, &trel, - t_ptr, debugargpass); -} - /* Add simple event thread. */ struct thread * funcname_thread_add_event (struct thread_master *m, @@ -986,6 +856,8 @@ funcname_thread_add_event (struct thread_master *m, *t_ptr = thread; thread->ref = t_ptr; } + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -995,10 +867,7 @@ funcname_thread_add_event (struct thread_master *m, static void thread_cancel_read_or_write (struct thread *thread, short int state) { -#if defined(HAVE_POLL_CALL) - nfds_t i; - - for (i=0;i<thread->master->handler.pfdcount;++i) + for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) if (thread->master->handler.pfds[i].fd == thread->u.fd) { thread->master->handler.pfds[i].events &= ~(state); @@ -1013,9 +882,6 @@ thread_cancel_read_or_write (struct thread *thread, short int state) return; } } -#endif - - fd_clear_read_write (thread); } /** @@ -1039,19 +905,11 @@ thread_cancel (struct thread *thread) switch (thread->type) { case THREAD_READ: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLIN | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->read; break; case THREAD_WRITE: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->write; break; case THREAD_TIMER: @@ -1063,9 +921,6 @@ thread_cancel (struct thread *thread) case THREAD_READY: list = &thread->master->ready; break; - case THREAD_BACKGROUND: - queue = thread->master->background; - break; default: goto done; break; @@ -1082,7 +937,7 @@ thread_cancel (struct thread *thread) } else if (thread_array) { - thread_delete_fd (thread_array, thread); + thread_array[thread->u.fd] = NULL; } else { @@ -1168,7 +1023,7 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) static struct thread * thread_run (struct thread_master *m, struct thread *thread, - struct thread *fetch) + struct thread *fetch) { *fetch = *thread; thread_add_unuse (m, thread); @@ -1176,7 +1031,8 @@ thread_run (struct thread_master *m, struct thread *thread, } static int -thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos) +thread_process_io_helper (struct thread_master *m, struct thread *thread, + short state, int pos) { struct thread **thread_array; @@ -1188,76 +1044,60 @@ thread_process_fds_helper (struct thread_master *m, struct thread *thread, threa else thread_array = m->write; - if (fd_is_set (thread, fdset, pos)) - { - fd_clear_read_write (thread); - thread_delete_fd (thread_array, thread); - thread_list_add (&m->ready, thread); - thread->type = THREAD_READY; -#if defined(HAVE_POLL_CALL) - thread->master->handler.pfds[pos].events &= ~(state); -#endif - return 1; - } - return 0; + thread_array[thread->u.fd] = NULL; + thread_list_add (&m->ready, thread); + thread->type = THREAD_READY; + /* if another pthread scheduled this file descriptor for the event we're + * responding to, no problem; we're getting to it now */ + thread->master->handler.pfds[pos].events &= ~(state); + return 1; } -#if defined(HAVE_POLL_CALL) - -/* check poll events */ static void -check_pollfds(struct thread_master *m, fd_set *readfd, int num) +thread_process_io (struct thread_master *m, struct pollfd *pfds, + unsigned int num, unsigned int count) { - nfds_t i = 0; - int ready = 0; - for (i = 0; i < m->handler.pfdcount && ready < num ; ++i) + unsigned int ready = 0; + + for (nfds_t i = 0; i < count && ready < num ; ++i) { - /* no event for current fd? immideatly continue */ - if(m->handler.pfds[i].revents == 0) + /* no event for current fd? immediately continue */ + if (pfds[i].revents == 0) continue; ready++; - /* POLLIN / POLLOUT process event */ - if (m->handler.pfds[i].revents & (POLLIN | POLLHUP)) - thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); - if (m->handler.pfds[i].revents & POLLOUT) - thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); - - /* remove fd from list on POLLNVAL */ - if (m->handler.pfds[i].revents & POLLNVAL) + /* Unless someone has called thread_cancel from another pthread, the only + * thing that could have changed in m->handler.pfds while we were + * asleep is the .events field in a given pollfd. Barring thread_cancel() + * that value should be a superset of the values we have in our copy, so + * there's no need to update it. Similarily, barring deletion, the fd + * should still be a valid index into the master's pfds. */ + if (pfds[i].revents & (POLLIN | POLLHUP)) + thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, i); + if (pfds[i].revents & POLLOUT) + thread_process_io_helper(m, m->write[pfds[i].fd], POLLOUT, i); + + /* if one of our file descriptors is garbage, remove the same from + * both pfds + update sizes and index */ + if (pfds[i].revents & POLLNVAL) { - memmove(m->handler.pfds+i, - m->handler.pfds+i+1, - (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); - m->handler.pfdcount--; - i--; + memmove (m->handler.pfds + i, + m->handler.pfds + i + 1, + (m->handler.pfdcount - i - 1) * sizeof(struct pollfd)); + m->handler.pfdcount--; + + memmove (pfds + i, pfds + i + 1, + (count - i - 1) * sizeof(struct pollfd)); + count--; + i--; } - else - m->handler.pfds[i].revents = 0; } } -#endif - -static void -thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num) -{ -#if defined (HAVE_POLL_CALL) - check_pollfds (m, rset, num); -#else - int ready = 0, index; - - for (index = 0; index < m->fd_limit && ready < num; ++index) - { - ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0); - ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0); - } -#endif -} /* Add all timers that have popped to the ready list. */ static unsigned int -thread_timer_process (struct pqueue *queue, struct timeval *timenow) +thread_process_timers (struct pqueue *queue, struct timeval *timenow) { struct thread *thread; unsigned int ready = 0; @@ -1300,14 +1140,9 @@ struct thread * thread_fetch (struct thread_master *m, struct thread *fetch) { struct thread *thread; - thread_fd_set readfd; - thread_fd_set writefd; - thread_fd_set exceptfd; struct timeval now; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; - struct timeval timer_val_bg; struct timeval *timer_wait = &timer_val; - struct timeval *timer_wait_bg; do { @@ -1338,22 +1173,10 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* Normal event are the next highest priority. */ thread_process (&m->event); - /* Structure copy. */ -#if !defined(HAVE_POLL_CALL) - readfd = fd_copy_fd_set(m->handler.readfd); - writefd = fd_copy_fd_set(m->handler.writefd); - exceptfd = fd_copy_fd_set(m->handler.exceptfd); -#endif - /* Calculate select wait timer if nothing else to do */ if (m->ready.count == 0) { timer_wait = thread_timer_wait (m->timer, &timer_val); - timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg); - - if (timer_wait_bg && - (!timer_wait || (timercmp (timer_wait, timer_wait_bg, >)))) - timer_wait = timer_wait_bg; } if (timer_wait && timer_wait->tv_sec < 0) @@ -1362,8 +1185,15 @@ thread_fetch (struct thread_master *m, struct thread *fetch) timer_wait = &timer_val; } - num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); - + unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; + memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); + + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); + } + pthread_mutex_lock (&m->mtx); + /* Signals should get quick treatment */ if (num < 0) { @@ -1372,39 +1202,21 @@ thread_fetch (struct thread_master *m, struct thread *fetch) pthread_mutex_unlock (&m->mtx); continue; /* signal received - process it */ } - zlog_warn ("select() error: %s", safe_strerror (errno)); + zlog_warn ("poll() error: %s", safe_strerror (errno)); pthread_mutex_unlock (&m->mtx); return NULL; } /* Check foreground timers. Historically, they have had higher - priority than I/O threads, so let's push them onto the ready - list in front of the I/O threads. */ + * priority than I/O threads, so let's push them onto the ready + * list in front of the I/O threads. */ monotime(&now); - thread_timer_process (m->timer, &now); + thread_process_timers (m->timer, &now); /* Got IO, process it */ if (num > 0) - thread_process_fds (m, &readfd, &writefd, num); - -#if 0 - /* If any threads were made ready above (I/O or foreground timer), - perhaps we should avoid adding background timers to the ready - list at this time. If this is code is uncommented, then background - timer threads will not run unless there is nothing else to do. */ - if ((thread = thread_trim_head (&m->ready)) != NULL) - { - fetch = thread_run (m, thread, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock (&m->mtx); - return fetch; - } -#endif + thread_process_io (m, m->handler.copy, num, count); - /* Background timer/events, lowest priority */ - thread_timer_process (m->background, &now); - if ((thread = thread_trim_head (&m->ready)) != NULL) { fetch = thread_run (m, thread, fetch); |
