From 75bcb3558d25b8ca7d3383f5c2c648d0aceae103 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 10 May 2017 18:09:49 +0000 Subject: lib: remove select() poll() is present on every supported platform and does not have an upper limit on file descriptors. Signed-off-by: Quentin Young [DL: split off from AWAKEN() change] --- lib/thread.c | 305 +++++++++++++++-------------------------------------------- 1 file changed, 74 insertions(+), 231 deletions(-) (limited to 'lib/thread.c') diff --git a/lib/thread.c b/lib/thread.c index ccb635a87d..7f58aea789 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -379,12 +379,11 @@ thread_master_create (void) rv->handle_signals = true; rv->owner = pthread_self(); -#if defined(HAVE_POLL_CALL) rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct pollfd) * rv->handler.pfdsize); -#endif + return rv; } @@ -419,18 +418,6 @@ thread_list_delete (struct thread_list *list, struct thread *thread) return thread; } -static void -thread_delete_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = NULL; -} - -static void -thread_add_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = thread; -} - /* Thread list is empty or not. */ static int thread_empty (struct thread_list *list) @@ -544,9 +531,7 @@ thread_master_free (struct thread_master *m) thread_queue_free (m, m->background); pthread_mutex_destroy (&m->mtx); -#if defined(HAVE_POLL_CALL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); -#endif XFREE (MTYPE_THREAD_MASTER, m); pthread_mutex_lock (&cpu_record_mtx); @@ -646,58 +631,9 @@ thread_get (struct thread_master *m, u_char type, return thread; } -#if defined (HAVE_POLL_CALL) - -#define fd_copy_fd_set(X) (X) - -/* generic add thread function */ -static struct thread * -generic_thread_add(struct thread_master *m, int (*func) (struct thread *), - void *arg, int fd, int dir, debugargdef) -{ - struct thread *thread; - - u_char type; - short int event; - - if (dir == THREAD_READ) - { - event = (POLLIN | POLLHUP); - type = THREAD_READ; - } - else - { - event = (POLLOUT | POLLHUP); - type = THREAD_WRITE; - } - - nfds_t queuepos = m->handler.pfdcount; - nfds_t i=0; - for (i=0; ihandler.pfdcount; i++) - if (m->handler.pfds[i].fd == fd) - { - queuepos = i; - break; - } - - /* is there enough space for a new fd? */ - assert (queuepos < m->handler.pfdsize); - - thread = thread_get (m, type, func, arg, debugargpass); - m->handler.pfds[queuepos].fd = fd; - m->handler.pfds[queuepos].events |= event; - if (queuepos == m->handler.pfdcount) - m->handler.pfdcount++; - - return thread; -} -#else - -#define fd_copy_fd_set(X) (X) -#endif - static int -fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) +fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, + nfds_t count, struct timeval *timer_wait) { int num; @@ -714,7 +650,6 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set * no event is detected. If the value is zero, the behavior is default. */ -#if defined(HAVE_POLL_CALL) int timeout = -1; if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value @@ -725,57 +660,10 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set timeout = 0; num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); -#else - struct timeval timeout; - - if (m->selectpoll_timeout > 0) // use the user's timeout - { - timeout.tv_sec = m->selectpoll_timeout / 1000; - timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000; - timer_wait = &timeout; - } - else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) - { - timeout.tv_sec = 0; - timeout.tv_usec = 0; - timer_wait = &timeout; - } - num = select (size, read, write, except, timer_wait); -#endif return num; } -static int -fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos) -{ -#if defined(HAVE_POLL_CALL) - return 1; -#else - return FD_ISSET (THREAD_FD (thread), fdset); -#endif -} - -static int -fd_clear_read_write (struct thread *thread) -{ -#if !defined(HAVE_POLL_CALL) - thread_fd_set *fdset = NULL; - int fd = THREAD_FD (thread); - - if (thread->type == THREAD_READ) - fdset = &thread->master->handler.readfd; - else - fdset = &thread->master->handler.writefd; - - if (!FD_ISSET (fd, fdset)) - return 0; - - FD_CLR (fd, fdset); -#endif - return 1; -} - /* Add new read thread. */ struct thread * funcname_thread_add_read_write (int dir, struct thread_master *m, @@ -792,32 +680,26 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, return NULL; } -#if defined (HAVE_POLL_CALL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); -#else - if (fd >= FD_SETSIZE) - { - zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile" - "with --enable-poll=yes", fd, FD_SETSIZE); - assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE"); - } - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; + /* default to a new pollfd */ + nfds_t queuepos = m->handler.pfdcount; - if (FD_ISSET (fd, fdset)) - { - zlog_warn ("There is already %s fd [%d]", - (dir == THREAD_READ) ? "read" : "write", fd); - } - else - { - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); - } -#endif + /* if we already have a pollfd for our file descriptor, find and use it */ + for (nfds_t i = 0; i < m->handler.pfdcount; i++) + if (m->handler.pfds[i].fd == fd) + { + queuepos = i; + break; + } + + assert (queuepos < m->handler.pfdsize); + + thread = thread_get (m, dir, func, arg, debugargpass); + + m->handler.pfds[queuepos].fd = fd; + m->handler.pfds[queuepos].events |= (dir == THREAD_READ ? POLLIN : POLLOUT); + + if (queuepos == m->handler.pfdcount) + m->handler.pfdcount++; if (thread) { @@ -825,9 +707,9 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { thread->u.fd = fd; if (dir == THREAD_READ) - thread_add_fd (m->read, thread); + m->read[thread->u.fd] = thread; else - thread_add_fd (m->write, thread); + m->write[thread->u.fd] = thread; } pthread_mutex_unlock (&thread->mtx); @@ -995,10 +877,7 @@ funcname_thread_add_event (struct thread_master *m, static void thread_cancel_read_or_write (struct thread *thread, short int state) { -#if defined(HAVE_POLL_CALL) - nfds_t i; - - for (i=0;imaster->handler.pfdcount;++i) + for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) if (thread->master->handler.pfds[i].fd == thread->u.fd) { thread->master->handler.pfds[i].events &= ~(state); @@ -1013,9 +892,6 @@ thread_cancel_read_or_write (struct thread *thread, short int state) return; } } -#endif - - fd_clear_read_write (thread); } /** @@ -1039,19 +915,11 @@ thread_cancel (struct thread *thread) switch (thread->type) { case THREAD_READ: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLIN | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->read; break; case THREAD_WRITE: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->write; break; case THREAD_TIMER: @@ -1082,7 +950,7 @@ thread_cancel (struct thread *thread) } else if (thread_array) { - thread_delete_fd (thread_array, thread); + thread_array[thread->u.fd] = NULL; } else { @@ -1168,7 +1036,7 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) static struct thread * thread_run (struct thread_master *m, struct thread *thread, - struct thread *fetch) + struct thread *fetch) { *fetch = *thread; thread_add_unuse (m, thread); @@ -1176,7 +1044,8 @@ thread_run (struct thread_master *m, struct thread *thread, } static int -thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos) +thread_process_io_helper (struct thread_master *m, struct thread *thread, + short state, int pos) { struct thread **thread_array; @@ -1188,76 +1057,60 @@ thread_process_fds_helper (struct thread_master *m, struct thread *thread, threa else thread_array = m->write; - if (fd_is_set (thread, fdset, pos)) - { - fd_clear_read_write (thread); - thread_delete_fd (thread_array, thread); - thread_list_add (&m->ready, thread); - thread->type = THREAD_READY; -#if defined(HAVE_POLL_CALL) - thread->master->handler.pfds[pos].events &= ~(state); -#endif - return 1; - } - return 0; + thread_array[thread->u.fd] = NULL; + thread_list_add (&m->ready, thread); + thread->type = THREAD_READY; + /* if another pthread scheduled this file descriptor for the event we're + * responding to, no problem; we're getting to it now */ + thread->master->handler.pfds[pos].events &= ~(state); + return 1; } -#if defined(HAVE_POLL_CALL) - -/* check poll events */ static void -check_pollfds(struct thread_master *m, fd_set *readfd, int num) +thread_process_io (struct thread_master *m, struct pollfd *pfds, + unsigned int num, unsigned int count) { - nfds_t i = 0; - int ready = 0; - for (i = 0; i < m->handler.pfdcount && ready < num ; ++i) + unsigned int ready = 0; + + for (nfds_t i = 0; i < count && ready < num ; ++i) { - /* no event for current fd? immideatly continue */ - if(m->handler.pfds[i].revents == 0) + /* no event for current fd? immediately continue */ + if (pfds[i].revents == 0) continue; ready++; - /* POLLIN / POLLOUT process event */ - if (m->handler.pfds[i].revents & (POLLIN | POLLHUP)) - thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); - if (m->handler.pfds[i].revents & POLLOUT) - thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); - - /* remove fd from list on POLLNVAL */ - if (m->handler.pfds[i].revents & POLLNVAL) + /* Unless someone has called thread_cancel from another pthread, the only + * thing that could have changed in m->handler.pfds while we were + * asleep is the .events field in a given pollfd. Barring thread_cancel() + * that value should be a superset of the values we have in our copy, so + * there's no need to update it. Similarily, barring deletion, the fd + * should still be a valid index into the master's pfds. */ + if (pfds[i].revents & (POLLIN | POLLHUP)) + thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, i); + if (pfds[i].revents & POLLOUT) + thread_process_io_helper(m, m->write[pfds[i].fd], POLLOUT, i); + + /* if one of our file descriptors is garbage, remove the same from + * both pfds + update sizes and index */ + if (pfds[i].revents & POLLNVAL) { - memmove(m->handler.pfds+i, - m->handler.pfds+i+1, - (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); - m->handler.pfdcount--; - i--; + memmove (m->handler.pfds + i, + m->handler.pfds + i + 1, + (m->handler.pfdcount - i - 1) * sizeof(struct pollfd)); + m->handler.pfdcount--; + + memmove (pfds + i, pfds + i + 1, + (count - i - 1) * sizeof(struct pollfd)); + count--; + i--; } - else - m->handler.pfds[i].revents = 0; - } -} -#endif - -static void -thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num) -{ -#if defined (HAVE_POLL_CALL) - check_pollfds (m, rset, num); -#else - int ready = 0, index; - - for (index = 0; index < m->fd_limit && ready < num; ++index) - { - ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0); - ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0); } -#endif } /* Add all timers that have popped to the ready list. */ static unsigned int -thread_timer_process (struct pqueue *queue, struct timeval *timenow) +thread_process_timers (struct pqueue *queue, struct timeval *timenow) { struct thread *thread; unsigned int ready = 0; @@ -1300,9 +1153,6 @@ struct thread * thread_fetch (struct thread_master *m, struct thread *fetch) { struct thread *thread; - thread_fd_set readfd; - thread_fd_set writefd; - thread_fd_set exceptfd; struct timeval now; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; struct timeval timer_val_bg; @@ -1338,13 +1188,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* Normal event are the next highest priority. */ thread_process (&m->event); - /* Structure copy. */ -#if !defined(HAVE_POLL_CALL) - readfd = fd_copy_fd_set(m->handler.readfd); - writefd = fd_copy_fd_set(m->handler.writefd); - exceptfd = fd_copy_fd_set(m->handler.exceptfd); -#endif - /* Calculate select wait timer if nothing else to do */ if (m->ready.count == 0) { @@ -1362,7 +1205,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) timer_wait = &timer_val; } - num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); + num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); /* Signals should get quick treatment */ if (num < 0) @@ -1372,20 +1215,20 @@ thread_fetch (struct thread_master *m, struct thread *fetch) pthread_mutex_unlock (&m->mtx); continue; /* signal received - process it */ } - zlog_warn ("select() error: %s", safe_strerror (errno)); + zlog_warn ("poll() error: %s", safe_strerror (errno)); pthread_mutex_unlock (&m->mtx); return NULL; } /* Check foreground timers. Historically, they have had higher - priority than I/O threads, so let's push them onto the ready - list in front of the I/O threads. */ + * priority than I/O threads, so let's push them onto the ready + * list in front of the I/O threads. */ monotime(&now); - thread_timer_process (m->timer, &now); + thread_process_timers (m->timer, &now); /* Got IO, process it */ if (num > 0) - thread_process_fds (m, &readfd, &writefd, num); + thread_process_io (m, pfds, num, count); #if 0 /* If any threads were made ready above (I/O or foreground timer), @@ -1403,7 +1246,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) #endif /* Background timer/events, lowest priority */ - thread_timer_process (m->background, &now); + thread_process_timers (m->background, &now); if ((thread = thread_trim_head (&m->ready)) != NULL) { -- cgit v1.2.3 From 3bf2673b3094727edee168f4b5a93095420f6b23 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 10 May 2017 18:09:49 +0000 Subject: lib: allow pthreads to poke poll() When scheduling a task onto a thread master owned by another pthread, we need to lock the thread master's mutex. However, if the pthread which owns that thread master is in poll(), we could be stuck waiting for a very long time. To solve this, we copy all data poll() needs and unlock during poll(). To break the target pthread out of poll(), thread_master has gained a pipe whose reading end is passed into poll(). After an event that requires immediate action by the target pthread, a byte is written into the pipe in order to wake it up. Signed-off-by: Quentin Young [DL: split off from select() removal] --- lib/thread.c | 66 ++++++++++++++++++++++++++++++++++++++++++++---------------- lib/thread.h | 1 + 2 files changed, 50 insertions(+), 17 deletions(-) (limited to 'lib/thread.c') diff --git a/lib/thread.c b/lib/thread.c index 7f58aea789..4077358dcf 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -30,6 +30,7 @@ #include "pqueue.h" #include "command.h" #include "sigevent.h" +#include "network.h" DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread") DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master") @@ -40,6 +41,12 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include #endif +#define AWAKEN(m) \ + do { \ + static unsigned char wakebyte = 0x01; \ + write (m->io_pipe[1], &wakebyte, 1); \ + } while (0); + static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; @@ -378,6 +385,8 @@ thread_master_create (void) rv->spin = true; rv->handle_signals = true; rv->owner = pthread_self(); + pipe (rv->io_pipe); + set_nonblocking (rv->io_pipe[0]); rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; @@ -530,6 +539,8 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); pthread_mutex_destroy (&m->mtx); + close (m->io_pipe[0]); + close (m->io_pipe[1]); XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); XFREE (MTYPE_THREAD_MASTER, m); @@ -635,23 +646,20 @@ static int fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, nfds_t count, struct timeval *timer_wait) { - int num; - - /* If timer_wait is null here, that means either select() or poll() should - * block indefinitely, unless the thread_master has overriden it. select() - * and poll() differ in the timeout values they interpret as an indefinite - * block; select() requires a null pointer, while poll takes a millisecond - * value of -1. - * - * The thread_master owner has the option of overriding the default behavior - * by setting ->selectpoll_timeout. If the value is positive, it specifies - * the maximum number of milliseconds to wait. If the timeout is -1, it - * specifies that we should never wait and always return immediately even if - * no event is detected. If the value is zero, the behavior is default. - */ + if (count == 0) + return 0; + /* If timer_wait is null here, that means poll() should block indefinitely, + * unless the thread_master has overriden it by setting ->selectpoll_timeout. + * If the value is positive, it specifies the maximum number of milliseconds + * to wait. If the timeout is -1, it specifies that we should never wait and + * always return immediately even if no event is detected. If the value is + * zero, the behavior is default. */ int timeout = -1; + /* number of file descriptors with events */ + int num; + if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); else if (m->selectpoll_timeout > 0) // use the user's timeout @@ -659,7 +667,17 @@ fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) timeout = 0; - num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); + /* add poll pipe poker */ + assert (count + 1 < pfdsize); + pfds[count].fd = m->io_pipe[0]; + pfds[count].events = POLLIN; + pfds[count].revents = 0x00; + + num = poll (pfds, count + 1, timeout); + + static unsigned char trash[64]; + if (num > 0 && pfds[count].revents != 0 && num--) + while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0); return num; } @@ -691,7 +709,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, break; } - assert (queuepos < m->handler.pfdsize); + /* make sure we have room for this fd + pipe poker fd */ + assert (queuepos + 1 < m->handler.pfdsize); thread = thread_get (m, dir, func, arg, debugargpass); @@ -761,6 +780,8 @@ funcname_thread_add_timer_timeval (struct thread_master *m, } } pthread_mutex_unlock (&thread->mtx); + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -868,6 +889,8 @@ funcname_thread_add_event (struct thread_master *m, *t_ptr = thread; thread->ref = t_ptr; } + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); @@ -1205,7 +1228,16 @@ thread_fetch (struct thread_master *m, struct thread *fetch) timer_wait = &timer_val; } - num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); + /* copy pollfds so we can unlock during blocking calls to poll() */ + struct pollfd pfds[m->handler.pfdsize]; + unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; + memcpy (pfds, m->handler.pfds, count * sizeof (struct pollfd)); + + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); + } + pthread_mutex_lock (&m->mtx); /* Signals should get quick treatment */ if (num < 0) diff --git a/lib/thread.h b/lib/thread.h index e40bf64855..753aa41ffd 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -66,6 +66,7 @@ struct thread_master struct thread_list ready; struct thread_list unuse; struct pqueue *background; + int io_pipe[2]; int fd_limit; struct fd_handler handler; unsigned long alloc; -- cgit v1.2.3 From a772d6eae61ccb090de8dda5df43e2770879baa0 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Thu, 18 May 2017 18:14:52 +0000 Subject: lib: missed AWAKEN in r/w thread scheduler Signed-off-by: Quentin Young --- lib/thread.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib/thread.c') diff --git a/lib/thread.c b/lib/thread.c index 4077358dcf..0188ae6c0b 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -738,6 +738,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, thread->ref = t_ptr; } } + + AWAKEN (m); } pthread_mutex_unlock (&m->mtx); -- cgit v1.2.3 From 8c88ac94fa8b847643eb03cbce2faa3e4d16d6a3 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 31 May 2017 17:30:53 +0000 Subject: lib: make writing end of pipe nonblocking Signed-off-by: Quentin Young --- lib/thread.c | 1 + 1 file changed, 1 insertion(+) (limited to 'lib/thread.c') diff --git a/lib/thread.c b/lib/thread.c index 0188ae6c0b..848e39e1ae 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -387,6 +387,7 @@ thread_master_create (void) rv->owner = pthread_self(); pipe (rv->io_pipe); set_nonblocking (rv->io_pipe[0]); + set_nonblocking (rv->io_pipe[1]); rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; -- cgit v1.2.3 From 95db01eb22d4afe43a7f37243077f24b0b90ae9d Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 31 May 2017 23:21:40 +0000 Subject: lib: use heap for pollfds a bunch of pollfds can cause a stack overflow when using a stack allocated buffer...silly me... Signed-off-by: Quentin Young --- lib/thread.c | 13 +++++++------ lib/thread.h | 3 +++ 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'lib/thread.c') diff --git a/lib/thread.c b/lib/thread.c index 848e39e1ae..2f15659a18 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -393,6 +393,8 @@ thread_master_create (void) rv->handler.pfdcount = 0; rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct pollfd) * rv->handler.pfdsize); + rv->handler.copy = XCALLOC (MTYPE_THREAD_MASTER, + sizeof (struct pollfd) * rv->handler.pfdsize); return rv; } @@ -544,6 +546,7 @@ thread_master_free (struct thread_master *m) close (m->io_pipe[1]); XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); + XFREE (MTYPE_THREAD_MASTER, m->handler.copy); XFREE (MTYPE_THREAD_MASTER, m); pthread_mutex_lock (&cpu_record_mtx); @@ -1231,17 +1234,15 @@ thread_fetch (struct thread_master *m, struct thread *fetch) timer_wait = &timer_val; } - /* copy pollfds so we can unlock during blocking calls to poll() */ - struct pollfd pfds[m->handler.pfdsize]; unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; - memcpy (pfds, m->handler.pfds, count * sizeof (struct pollfd)); + memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); pthread_mutex_unlock (&m->mtx); { - num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); + num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); } pthread_mutex_lock (&m->mtx); - + /* Signals should get quick treatment */ if (num < 0) { @@ -1263,7 +1264,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* Got IO, process it */ if (num > 0) - thread_process_io (m, pfds, num, count); + thread_process_io (m, m->handler.copy, num, count); #if 0 /* If any threads were made ready above (I/O or foreground timer), diff --git a/lib/thread.h b/lib/thread.h index 753aa41ffd..608fb8b8c0 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -53,7 +53,10 @@ struct fd_handler nfds_t pfdcountsnmp; /* number of pfd that fit in the allocated space of pfds */ nfds_t pfdsize; + /* file descriptors to monitor for i/o */ struct pollfd *pfds; + /* chunk used for temp copy of pollfds */ + struct pollfd *copy; }; /* Master of the theads. */ -- cgit v1.2.3 From 82fc5591f440ee9810ce2c5d90d41a9f419062c8 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Mon, 5 Jun 2017 20:18:48 +0000 Subject: lib: allow infinite sleep in poll() If fd_poll() is called with no file descriptors, an incorrect check in the function prelude causes it to return instantly; for a thread that wishes to poll but has no file descriptors, this results in busy waiting. Desired behavior is to block. Signed-off-by: Quentin Young --- lib/thread.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'lib/thread.c') diff --git a/lib/thread.c b/lib/thread.c index 848e39e1ae..2280b96316 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -647,9 +647,6 @@ static int fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, nfds_t count, struct timeval *timer_wait) { - if (count == 0) - return 0; - /* If timer_wait is null here, that means poll() should block indefinitely, * unless the thread_master has overriden it by setting ->selectpoll_timeout. * If the value is positive, it specifies the maximum number of milliseconds -- cgit v1.2.3 From a587d00bace8e675b897abda12e8b2a0fc752b11 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Thu, 8 Jun 2017 01:53:50 +0000 Subject: *: remove THREAD_BACKGROUND it's just an alias for a millisecond timer used in exactly nine places and serves only to complicate Signed-off-by: Quentin Young --- bgpd/bgp_updgrp.c | 4 +-- bgpd/rfapi/rfapi_import.c | 2 +- lib/thread.c | 74 +++---------------------------------------- lib/thread.h | 12 ++----- lib/workqueue.c | 4 +-- tests/lib/test_heavy_thread.c | 4 +-- zebra/zebra_fpm.c | 10 +++--- 7 files changed, 19 insertions(+), 91 deletions(-) (limited to 'lib/thread.c') diff --git a/bgpd/bgp_updgrp.c b/bgpd/bgp_updgrp.c index 04d262050f..287323bda9 100644 --- a/bgpd/bgp_updgrp.c +++ b/bgpd/bgp_updgrp.c @@ -1169,8 +1169,8 @@ update_subgroup_trigger_merge_check (struct update_subgroup *subgrp, return 0; subgrp->t_merge_check = NULL; - thread_add_background(bm->master, update_subgroup_merge_check_thread_cb, subgrp, 0, - &subgrp->t_merge_check); + thread_add_timer_msec (bm->master, update_subgroup_merge_check_thread_cb, subgrp, + 0, &subgrp->t_merge_check); SUBGRP_INCR_STAT (subgrp, merge_checks_triggered); diff --git a/bgpd/rfapi/rfapi_import.c b/bgpd/rfapi/rfapi_import.c index 741ad7d705..cefb9ef85a 100644 --- a/bgpd/rfapi/rfapi_import.c +++ b/bgpd/rfapi/rfapi_import.c @@ -3051,7 +3051,7 @@ rfapiBiStartWithdrawTimer ( lifetime_msec = (lifetime * 1000) + jitter; bi->extra->vnc.import.timer = NULL; - thread_add_background(bm->master, timer_service_func, wcb, lifetime_msec, + thread_add_timer_msec(bm->master, timer_service_func, wcb, lifetime_msec, &bi->extra->vnc.import.timer); } diff --git a/lib/thread.c b/lib/thread.c index 8c54ec6cea..bf3500fd8b 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -96,13 +96,12 @@ vty_out_cpu_thread_history(struct vty* vty, a->total_active, a->cpu.total/1000, a->cpu.total%1000, a->total_calls, a->cpu.total/a->total_calls, a->cpu.max, a->real.total/a->total_calls, a->real.max); - vty_out(vty, " %c%c%c%c%c%c %s%s", + vty_out(vty, " %c%c%c%c%c %s%s", a->types & (1 << THREAD_READ) ? 'R':' ', a->types & (1 << THREAD_WRITE) ? 'W':' ', a->types & (1 << THREAD_TIMER) ? 'T':' ', a->types & (1 << THREAD_EVENT) ? 'E':' ', a->types & (1 << THREAD_EXECUTE) ? 'X':' ', - a->types & (1 << THREAD_BACKGROUND) ? 'B' : ' ', a->funcname, VTY_NEWLINE); } @@ -195,10 +194,6 @@ DEFUN (show_thread_cpu, case 'X': filter |= (1 << THREAD_EXECUTE); break; - case 'b': - case 'B': - filter |= (1 << THREAD_BACKGROUND); - break; default: break; } @@ -287,10 +282,6 @@ DEFUN (clear_thread_cpu, case 'X': filter |= (1 << THREAD_EXECUTE); break; - case 'b': - case 'B': - filter |= (1 << THREAD_BACKGROUND); - break; default: break; } @@ -379,9 +370,8 @@ thread_master_create (void) /* Initialize the timer queues */ rv->timer = pqueue_create(); - rv->background = pqueue_create(); - rv->timer->cmp = rv->background->cmp = thread_timer_cmp; - rv->timer->update = rv->background->update = thread_timer_update; + rv->timer->cmp = thread_timer_cmp; + rv->timer->update = thread_timer_update; rv->spin = true; rv->handle_signals = true; rv->owner = pthread_self(); @@ -540,7 +530,6 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->event); thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); - thread_queue_free (m, m->background); pthread_mutex_destroy (&m->mtx); close (m->io_pipe[0]); close (m->io_pipe[1]); @@ -757,7 +746,7 @@ funcname_thread_add_timer_timeval (struct thread_master *m, assert (m != NULL); - assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); + assert (type == THREAD_TIMER); assert (time_relative); pthread_mutex_lock (&m->mtx); @@ -768,7 +757,7 @@ funcname_thread_add_timer_timeval (struct thread_master *m, return NULL; } - queue = ((type == THREAD_TIMER) ? m->timer : m->background); + queue = m->timer; thread = thread_get (m, type, func, arg, debugargpass); pthread_mutex_lock (&thread->mtx); @@ -836,31 +825,6 @@ funcname_thread_add_timer_tv (struct thread_master *m, t_ptr, debugargpass); } -/* Add a background thread, with an optional millisec delay */ -struct thread * -funcname_thread_add_background (struct thread_master *m, - int (*func) (struct thread *), void *arg, long delay, - struct thread **t_ptr, debugargdef) -{ - struct timeval trel; - - assert (m != NULL); - - if (delay) - { - trel.tv_sec = delay / 1000; - trel.tv_usec = 1000*(delay % 1000); - } - else - { - trel.tv_sec = 0; - trel.tv_usec = 0; - } - - return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND, arg, &trel, - t_ptr, debugargpass); -} - /* Add simple event thread. */ struct thread * funcname_thread_add_event (struct thread_master *m, @@ -957,9 +921,6 @@ thread_cancel (struct thread *thread) case THREAD_READY: list = &thread->master->ready; break; - case THREAD_BACKGROUND: - queue = thread->master->background; - break; default: goto done; break; @@ -1181,9 +1142,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) struct thread *thread; struct timeval now; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; - struct timeval timer_val_bg; struct timeval *timer_wait = &timer_val; - struct timeval *timer_wait_bg; do { @@ -1218,11 +1177,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch) if (m->ready.count == 0) { timer_wait = thread_timer_wait (m->timer, &timer_val); - timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg); - - if (timer_wait_bg && - (!timer_wait || (timercmp (timer_wait, timer_wait_bg, >)))) - timer_wait = timer_wait_bg; } if (timer_wait && timer_wait->tv_sec < 0) @@ -1263,24 +1217,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch) if (num > 0) thread_process_io (m, m->handler.copy, num, count); -#if 0 - /* If any threads were made ready above (I/O or foreground timer), - perhaps we should avoid adding background timers to the ready - list at this time. If this is code is uncommented, then background - timer threads will not run unless there is nothing else to do. */ - if ((thread = thread_trim_head (&m->ready)) != NULL) - { - fetch = thread_run (m, thread, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock (&m->mtx); - return fetch; - } -#endif - - /* Background timer/events, lowest priority */ - thread_process_timers (m->background, &now); - if ((thread = thread_trim_head (&m->ready)) != NULL) { fetch = thread_run (m, thread, fetch); diff --git a/lib/thread.h b/lib/thread.h index 608fb8b8c0..86f839810f 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -68,7 +68,6 @@ struct thread_master struct thread_list event; struct thread_list ready; struct thread_list unuse; - struct pqueue *background; int io_pipe[2]; int fd_limit; struct fd_handler handler; @@ -131,9 +130,8 @@ struct cpu_thread_history #define THREAD_TIMER 2 #define THREAD_EVENT 3 #define THREAD_READY 4 -#define THREAD_BACKGROUND 5 -#define THREAD_UNUSED 6 -#define THREAD_EXECUTE 7 +#define THREAD_UNUSED 5 +#define THREAD_EXECUTE 6 /* Thread yield time. */ #define THREAD_YIELD_TIME_SLOT 10 * 1000L /* 10ms */ @@ -166,9 +164,6 @@ struct cpu_thread_history #define thread_add_event(m,f,a,v,t) funcname_thread_add_event(m,f,a,v,t,#f,__FILE__,__LINE__) #define thread_execute(m,f,a,v) funcname_thread_execute(m,f,a,v,#f,__FILE__,__LINE__) -/* The 4th arg to thread_add_background is the # of milliseconds to delay. */ -#define thread_add_background(m,f,a,v,t) funcname_thread_add_background(m,f,a,v,t,#f,__FILE__,__LINE__) - /* Prototypes. */ extern struct thread_master *thread_master_create (void); extern void thread_master_free (struct thread_master *); @@ -189,9 +184,6 @@ extern struct thread * funcname_thread_add_timer_tv (struct thread_master *, extern struct thread * funcname_thread_add_event (struct thread_master *, int (*)(struct thread *), void *, int, struct thread **, debugargdef); -extern struct thread * funcname_thread_add_background (struct thread_master *, - int (*)(struct thread *), void *, long, struct thread **, debugargdef); - extern void funcname_thread_execute (struct thread_master *, int (*)(struct thread *), void *, int, debugargdef); #undef debugargdef diff --git a/lib/workqueue.c b/lib/workqueue.c index f992588399..8a06502894 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -126,8 +126,8 @@ work_queue_schedule (struct work_queue *wq, unsigned int delay) && (listcount (wq->items) > 0) ) { wq->thread = NULL; - thread_add_background(wq->master, work_queue_run, wq, delay, - &wq->thread); + thread_add_timer_msec (wq->master, work_queue_run, wq, delay, + &wq->thread); /* set thread yield time, if needed */ if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT) thread_set_yield_time (wq->thread, wq->spec.yield); diff --git a/tests/lib/test_heavy_thread.c b/tests/lib/test_heavy_thread.c index 3b85619d3a..b39b3b7d46 100644 --- a/tests/lib/test_heavy_thread.c +++ b/tests/lib/test_heavy_thread.c @@ -90,7 +90,7 @@ clear_something (struct thread *thread) ws->i++; if (thread_should_yield(thread)) { - thread_add_background(master, clear_something, ws, 0, NULL); + thread_add_timer_msec (master, clear_something, ws, 0, NULL); return 0; } } @@ -134,7 +134,7 @@ DEFUN (clear_foo, ws->vty = vty; ws->i = ITERS_FIRST; - thread_add_background(master, clear_something, ws, 0, NULL); + thread_add_timer_msec (master, clear_something, ws, 0, NULL); return CMD_SUCCESS; } diff --git a/zebra/zebra_fpm.c b/zebra/zebra_fpm.c index 37068c1fae..00b604c265 100644 --- a/zebra/zebra_fpm.c +++ b/zebra/zebra_fpm.c @@ -564,8 +564,8 @@ zfpm_conn_up_thread_cb (struct thread *thread) zfpm_g->stats.t_conn_up_yields++; zfpm_rnodes_iter_pause (iter); zfpm_g->t_conn_up = NULL; - thread_add_background(zfpm_g->master, zfpm_conn_up_thread_cb, 0, 0, - &zfpm_g->t_conn_up); + thread_add_timer_msec (zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0, + &zfpm_g->t_conn_up); return 0; } @@ -598,7 +598,7 @@ zfpm_connection_up (const char *detail) zfpm_debug ("Starting conn_up thread"); zfpm_g->t_conn_up = NULL; - thread_add_background(zfpm_g->master, zfpm_conn_up_thread_cb, 0, 0, + thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0, &zfpm_g->t_conn_up); zfpm_g->stats.t_conn_up_starts++; } @@ -688,7 +688,7 @@ zfpm_conn_down_thread_cb (struct thread *thread) zfpm_g->stats.t_conn_down_yields++; zfpm_rnodes_iter_pause (iter); zfpm_g->t_conn_down = NULL; - thread_add_background(zfpm_g->master, zfpm_conn_down_thread_cb, 0, 0, + thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0, &zfpm_g->t_conn_down); return 0; } @@ -736,7 +736,7 @@ zfpm_connection_down (const char *detail) zfpm_debug ("Starting conn_down thread"); zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter); zfpm_g->t_conn_down = NULL; - thread_add_background(zfpm_g->master, zfpm_conn_down_thread_cb, 0, 0, + thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0, &zfpm_g->t_conn_down); zfpm_g->stats.t_conn_down_starts++; -- cgit v1.2.3