diff options
| -rw-r--r-- | lib/thread.c | 141 |
1 files changed, 57 insertions, 84 deletions
diff --git a/lib/thread.c b/lib/thread.c index bb524f0ad0..8c5c7487e7 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -560,7 +560,7 @@ thread_master_create (void) rv->timer->update = rv->background->update = thread_timer_update; #if defined(HAVE_POLL) - rv->handler.pfdsize = 64; + rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize); memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize); @@ -791,53 +791,26 @@ thread_get (struct thread_master *m, u_char type, return thread; } -#if defined (HAVE_POLL) - #define fd_copy_fd_set(X) (X) -static short -realloc_pfds (struct thread_master *m, int fd) -{ - size_t oldpfdlen = m->handler.pfdsize * sizeof(struct pollfd); - void *newpfd = NULL; - - m->handler.pfdsize *= 2; - newpfd = XREALLOC (MTYPE_THREAD, m->handler.pfds, m->handler.pfdsize * sizeof(struct pollfd)); - if (newpfd == NULL) - { - close(fd); - zlog (NULL, LOG_ERR, "failed to allocate space for pollfds"); - return 0; - } - memset((struct pollfd*)newpfd + (m->handler.pfdsize / 2), 0, oldpfdlen); - m->handler.pfds = (struct pollfd*)newpfd; - return 1; -} - /* generic add thread function */ static struct thread * -generic_thread_add(struct thread_master *m, int (*func) (struct thread *), - void *arg, int fd, const char* funcname, int dir) +generic_thread_add (struct thread_master *m, int (*func) (struct thread *), + void *arg, int fd, const char* funcname, int dir) { struct thread *thread; - u_char type; +#if defined (HAVE_POLL) short int event; if (dir == THREAD_READ) - { - event = (POLLIN | POLLHUP); - type = THREAD_READ; - } + event = (POLLIN | POLLHUP); else - { - event = (POLLOUT | POLLHUP); - type = THREAD_WRITE; - } + event = (POLLOUT | POLLHUP); nfds_t queuepos = m->handler.pfdcount; nfds_t i=0; - for (i=0; i<m->handler.pfdcount; i++) + for (i=0; i < m->handler.pfdcount; i++) if (m->handler.pfds[i].fd == fd) { queuepos = i; @@ -845,23 +818,31 @@ generic_thread_add(struct thread_master *m, int (*func) (struct thread *), } /* is there enough space for a new fd? */ - if (queuepos >= m->handler.pfdsize) - if (realloc_pfds(m, fd) == 0) - return NULL; + assert (queuepos < m->handler.pfdsize); - thread = thread_get (m, type, func, arg, funcname); m->handler.pfds[queuepos].fd = fd; m->handler.pfds[queuepos].events |= event; if (queuepos == m->handler.pfdcount) m->handler.pfdcount++; - - return thread; -} #else + if (dir == THREAD_READ) + fdset = &m->handler.readfd; + else + fdset = &m->handler.writefd; -#define fd_copy_fd_set(X) (X) + if (FD_ISSET (fd, fdset)) + { + zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd); + return NULL; + } + + FD_SET (fd, fdset); #endif + thread = thread_get (m, dir, func, arg, funcname); + return thread; +} + static int fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) { @@ -896,6 +877,7 @@ fd_clear_read_write (struct thread *thread) { #if !defined(HAVE_POLL) thread_fd_set *fdset = NULL; + int fd = THREAD_FD (thread); if (thread->type == THREAD_READ) @@ -918,29 +900,7 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { struct thread *thread = NULL; -#if !defined(HAVE_POLL) - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; -#endif - -#if defined (HAVE_POLL) - thread = generic_thread_add(m, func, arg, fd, funcname, dir); - - if (thread == NULL) - return NULL; -#else - if (FD_ISSET (fd, fdset)) - { - zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd); - return NULL; - } - - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, funcname); -#endif + thread = generic_thread_add (m, func, arg, fd, funcname, dir); thread->u.fd = fd; if (dir == THREAD_READ) @@ -1244,9 +1204,7 @@ add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize) { if (FD_ISSET(i, snmpfds)) { - if (m->handler.pfdcountsnmp > m->handler.pfdsize) - if (realloc_pfds(m, i) < 0) - return; + assert (m->handler.pfdcountsnmp <= m->handler.pfdsize); m->handler.pfds[m->handler.pfdcountsnmp].fd = i; m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN; @@ -1262,7 +1220,9 @@ check_pollfds(struct thread_master *m, fd_set *readfd, int num) { nfds_t i = 0; int ready = 0; - for (i = 0; i < m->handler.pfdcount && ready < num ; ++i) + int handled = 0; + + for (i = 0; i < m->handler.pfdcount && handled < num ; ++i) { /* no event for current fd? immideatly continue */ if(m->handler.pfds[i].revents == 0) @@ -1271,32 +1231,45 @@ check_pollfds(struct thread_master *m, fd_set *readfd, int num) /* remove fd from list on POLLNVAL */ if (m->handler.pfds[i].revents & POLLNVAL) { - memmove(m->handler.pfds+i, - m->handler.pfds+i+1, - (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); - m->handler.pfdcount--; - i--; - continue; + memmove (m->handler.pfds+i, + m->handler.pfds+i+1, + (m->handler.pfdsize-i-1) * sizeof (struct pollfd)); + m->handler.pfdcount--; + i--; + continue; } /* POLLIN / POLLOUT process event */ if (m->handler.pfds[i].revents & POLLIN) - ready += thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); + ready += thread_process_fds_helper (m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); if (m->handler.pfds[i].revents & POLLOUT) - ready += thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); + ready += thread_process_fds_helper (m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); /* remove fd from list on POLLHUP after other event is processed */ if (m->handler.pfds[i].revents & POLLHUP) { - memmove(m->handler.pfds+i, - m->handler.pfds+i+1, - (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); - m->handler.pfdcount--; - i--; - ready++; + memmove (m->handler.pfds+i, + m->handler.pfds+i+1, + (m->handler.pfdsize-i-1) * sizeof (struct pollfd)); + m->handler.pfdcount--; + i--; + ready++; } else - m->handler.pfds[i].revents = 0; + m->handler.pfds[i].revents = 0; + + /* + * WTF? We get 1 event per revents being non 0 + * But we could possibly be counting up multiple + * times for each revents handled. So + * note that we've handled any revents and + * make it count once; + */ + if (ready) + { + handled++; + ready = 0; + } } } #endif |
