From 0a95a0d0b60e9f8ff28ccb28179f59f68e421346 Mon Sep 17 00:00:00 2001 From: Donald Sharp Date: Fri, 4 Mar 2016 06:28:29 +0000 Subject: [PATCH] lib: Add ability to use poll() instead of select This patch originated w/ Hannes Hofer . I've taken the patch fixed some bugs and reworked the code to allow both poll and select to be choosen at compile time. Signed-off-by: Donald Sharp --- configure.ac | 6 ++ lib/thread.c | 292 +++++++++++++++++++++++++++++++++++++++++++++------ lib/thread.h | 25 ++++- 3 files changed, 286 insertions(+), 37 deletions(-) diff --git a/configure.ac b/configure.ac index 74020ad736..d7b0ffe8f9 100755 --- a/configure.ac +++ b/configure.ac @@ -300,6 +300,8 @@ AC_ARG_ENABLE(fpm, [ --enable-fpm enable Forwarding Plane Manager support]) AC_ARG_ENABLE(systemd, [ --enable-systemd enable Systemd support]) +AC_ARG_ENABLE(poll, +[ --enable-poll enable usage of Poll instead of select]) AC_ARG_ENABLE(werror, AS_HELP_STRING([--enable-werror], [enable -Werror (recommended for developers only)])) AC_ARG_ENABLE(cumulus, @@ -331,6 +333,10 @@ if test "${enable_systemd}" = "yes" ; then LIBS="$LIBS -lsystemd " fi +if test "${enable_poll}" = "yes" ; then + AC_DEFINE(HAVE_POLL,,Compile systemd support in) +fi + if test "${enable_cumulus}" = "yes" ; then AC_DEFINE(HAVE_CUMULUS,,Compile Special Cumulus Code in) fi diff --git a/lib/thread.c b/lib/thread.c index 9c009ead1b..bb524f0ad0 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -57,7 +57,6 @@ static unsigned short timers_inited; static struct hash *cpu_record = NULL; - /* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO). And change negative values to 0. */ static struct timeval @@ -520,7 +519,7 @@ thread_timer_update(void *node, int actual_position) /* Allocate new thread master. */ struct thread_master * -thread_master_create () +thread_master_create (void) { struct thread_master *rv; struct rlimit limit; @@ -560,6 +559,12 @@ thread_master_create () rv->timer->cmp = rv->background->cmp = thread_timer_cmp; rv->timer->update = rv->background->update = thread_timer_update; +#if defined(HAVE_POLL) + rv->handler.pfdsize = 64; + rv->handler.pfdcount = 0; + rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize); + memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize); +#endif return rv; } @@ -710,7 +715,10 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); - + +#if defined(HAVE_POLL) + XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); +#endif XFREE (MTYPE_THREAD_MASTER, m); if (cpu_record) @@ -783,27 +791,123 @@ thread_get (struct thread_master *m, u_char type, return thread; } +#if defined (HAVE_POLL) + +#define fd_copy_fd_set(X) (X) + +static short +realloc_pfds (struct thread_master *m, int fd) +{ + size_t oldpfdlen = m->handler.pfdsize * sizeof(struct pollfd); + void *newpfd = NULL; + + m->handler.pfdsize *= 2; + newpfd = XREALLOC (MTYPE_THREAD, m->handler.pfds, m->handler.pfdsize * sizeof(struct pollfd)); + if (newpfd == NULL) + { + close(fd); + zlog (NULL, LOG_ERR, "failed to allocate space for pollfds"); + return 0; + } + memset((struct pollfd*)newpfd + (m->handler.pfdsize / 2), 0, oldpfdlen); + m->handler.pfds = (struct pollfd*)newpfd; + return 1; +} + +/* generic add thread function */ +static struct thread * +generic_thread_add(struct thread_master *m, int (*func) (struct thread *), + void *arg, int fd, const char* funcname, int dir) +{ + struct thread *thread; + + u_char type; + short int event; + + if (dir == THREAD_READ) + { + event = (POLLIN | POLLHUP); + type = THREAD_READ; + } + else + { + event = (POLLOUT | POLLHUP); + type = THREAD_WRITE; + } + + nfds_t queuepos = m->handler.pfdcount; + nfds_t i=0; + for (i=0; ihandler.pfdcount; i++) + if (m->handler.pfds[i].fd == fd) + { + queuepos = i; + break; + } + + /* is there enough space for a new fd? */ + if (queuepos >= m->handler.pfdsize) + if (realloc_pfds(m, fd) == 0) + return NULL; + + thread = thread_get (m, type, func, arg, funcname); + m->handler.pfds[queuepos].fd = fd; + m->handler.pfds[queuepos].events |= event; + if (queuepos == m->handler.pfdcount) + m->handler.pfdcount++; + + return thread; +} +#else + #define fd_copy_fd_set(X) (X) +#endif static int -fd_select (int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *t) +fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) { - return(select(size, read, write, except, t)); + int num; +#if defined(HAVE_POLL) + /* recalc timeout for poll. Attention NULL pointer is no timeout with + select, where with poll no timeount is -1 */ + int timeout = -1; + if (timer_wait != NULL) + timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); + + num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); +#else + num = select (size, read, write, except, timer_wait); +#endif + + return num; } static int -fd_is_set (int fd, thread_fd_set *fdset) +fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos) { - return FD_ISSET (fd, fdset); +#if defined(HAVE_POLL) + return 1; +#else + return FD_ISSET (THREAD_FD (thread), fdset); +#endif } static int -fd_clear_read_write (int fd, thread_fd_set *fdset) +fd_clear_read_write (struct thread *thread) { +#if !defined(HAVE_POLL) + thread_fd_set *fdset = NULL; + int fd = THREAD_FD (thread); + + if (thread->type == THREAD_READ) + fdset = &thread->master->handler.readfd; + else + fdset = &thread->master->handler.writefd; + if (!FD_ISSET (fd, fdset)) return 0; FD_CLR (fd, fdset); +#endif return 1; } @@ -813,13 +917,21 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, int (*func) (struct thread *), void *arg, int fd, const char* funcname) { struct thread *thread = NULL; - thread_fd_set *fdset = NULL; +#if !defined(HAVE_POLL) + thread_fd_set *fdset = NULL; if (dir == THREAD_READ) - fdset = &m->readfd; + fdset = &m->handler.readfd; else - fdset = &m->writefd; + fdset = &m->handler.writefd; +#endif +#if defined (HAVE_POLL) + thread = generic_thread_add(m, func, arg, fd, funcname, dir); + + if (thread == NULL) + return NULL; +#else if (FD_ISSET (fd, fdset)) { zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd); @@ -827,8 +939,9 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, } FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, funcname); +#endif + thread->u.fd = fd; if (dir == THREAD_READ) thread_add_fd (m->read, thread); @@ -945,6 +1058,27 @@ funcname_thread_add_event (struct thread_master *m, return thread; } +static void +thread_cancel_read_write (struct thread *thread) +{ +#if defined(HAVE_POLL) + nfds_t i; + + for (i=0;imaster->handler.pfdcount;++i) + if (thread->master->handler.pfds[i].fd == thread->u.fd) + { + /* remove thread fds from pfd list */ + memmove(thread->master->handler.pfds+i, + thread->master->handler.pfds+i+1, + (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd)); + i--; + thread->master->handler.pfdcount--; + } +#endif + + fd_clear_read_write (thread); +} + /* Cancel thread from scheduler. */ void thread_cancel (struct thread *thread) @@ -956,11 +1090,11 @@ thread_cancel (struct thread *thread) switch (thread->type) { case THREAD_READ: - assert (fd_clear_read_write (thread->u.fd, &thread->master->readfd)); + thread_cancel_read_write (thread); thread_array = thread->master->read; break; case THREAD_WRITE: - assert (fd_clear_read_write (thread->u.fd, &thread->master->writefd)); + thread_cancel_read_write (thread); thread_array = thread->master->write; break; case THREAD_TIMER: @@ -1070,47 +1204,117 @@ thread_run (struct thread_master *m, struct thread *thread, } static int -thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset) +thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos) { - thread_fd_set *mfdset = NULL; struct thread **thread_array; if (!thread) return 0; if (thread->type == THREAD_READ) - { - mfdset = &m->readfd; - thread_array = m->read; - } + thread_array = m->read; else - { - mfdset = &m->writefd; - thread_array = m->write; - } + thread_array = m->write; - if (fd_is_set (THREAD_FD (thread), fdset)) + if (fd_is_set (thread, fdset, pos)) { - fd_clear_read_write (THREAD_FD (thread), mfdset); + fd_clear_read_write (thread); thread_delete_fd (thread_array, thread); thread_list_add (&m->ready, thread); thread->type = THREAD_READY; +#if defined(HAVE_POLL) + thread->master->handler.pfds[pos].events &= ~(state); +#endif return 1; } return 0; } -static int +#if defined(HAVE_POLL) + +#if defined(HAVE_SNMP) +/* add snmp fds to poll set */ +static void +add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize) +{ + int i; + m->handler.pfdcountsnmp = m->handler.pfdcount; + /* cycle trough fds and add neccessary fds to poll set */ + for (i=0;ihandler.pfdcountsnmp > m->handler.pfdsize) + if (realloc_pfds(m, i) < 0) + return; + + m->handler.pfds[m->handler.pfdcountsnmp].fd = i; + m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN; + m->handler.pfdcountsnmp++; + } + } +} +#endif + +/* check poll events */ +static void +check_pollfds(struct thread_master *m, fd_set *readfd, int num) +{ + nfds_t i = 0; + int ready = 0; + for (i = 0; i < m->handler.pfdcount && ready < num ; ++i) + { + /* no event for current fd? immideatly continue */ + if(m->handler.pfds[i].revents == 0) + continue; + + /* remove fd from list on POLLNVAL */ + if (m->handler.pfds[i].revents & POLLNVAL) + { + memmove(m->handler.pfds+i, + m->handler.pfds+i+1, + (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); + m->handler.pfdcount--; + i--; + continue; + } + + /* POLLIN / POLLOUT process event */ + if (m->handler.pfds[i].revents & POLLIN) + ready += thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); + if (m->handler.pfds[i].revents & POLLOUT) + ready += thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); + + /* remove fd from list on POLLHUP after other event is processed */ + if (m->handler.pfds[i].revents & POLLHUP) + { + memmove(m->handler.pfds+i, + m->handler.pfds+i+1, + (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); + m->handler.pfdcount--; + i--; + ready++; + } + else + m->handler.pfds[i].revents = 0; + } +} +#endif + +static void thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num) { +#if defined (HAVE_POLL) + check_pollfds (m, rset, num); +#else int ready = 0, index; for (index = 0; index < m->fd_limit && ready < num; ++index) { - ready += thread_process_fds_helper (m, m->read[index], rset); - ready += thread_process_fds_helper (m, m->write[index], wset); + ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0); + ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0); } - return num - ready; +#endif } /* Add all timers that have popped to the ready list. */ @@ -1193,9 +1397,11 @@ thread_fetch (struct thread_master *m, struct thread *fetch) thread_process (&m->event); /* Structure copy. */ - readfd = fd_copy_fd_set(m->readfd); - writefd = fd_copy_fd_set(m->writefd); - exceptfd = fd_copy_fd_set(m->exceptfd); +#if !defined(HAVE_POLL) + readfd = fd_copy_fd_set(m->handler.readfd); + writefd = fd_copy_fd_set(m->handler.writefd); + exceptfd = fd_copy_fd_set(m->handler.exceptfd); +#endif /* Calculate select wait timer if nothing else to do */ if (m->ready.count == 0) @@ -1224,12 +1430,20 @@ thread_fetch (struct thread_master *m, struct thread *fetch) snmpblock = 0; memcpy(&snmp_timer_wait, timer_wait, sizeof(struct timeval)); } +#if defined(HAVE_POLL) + /* clear fdset since there are no other fds in fd_set, + then add injected fds from snmp_select_info into pollset */ + FD_ZERO(&readfd); +#endif snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock); +#if defined(HAVE_POLL) + add_snmp_pollfds(m, &readfd, fdsetsize); +#endif if (snmpblock == 0) timer_wait = &snmp_timer_wait; } #endif - num = fd_select (FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); + num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); /* Signals should get quick treatment */ if (num < 0) @@ -1241,6 +1455,16 @@ thread_fetch (struct thread_master *m, struct thread *fetch) } #if defined HAVE_SNMP && defined SNMP_AGENTX +#if defined(HAVE_POLL) + /* re-enter pollfds in fd_set for handling in snmp_read */ + FD_ZERO(&readfd); + nfds_t i; + for (i = m->handler.pfdcount; i < m->handler.pfdcountsnmp; ++i) + { + if (m->handler.pfds[i].revents == POLLIN) + FD_SET(m->handler.pfds[i].fd, &readfd); + } +#endif if (agentx_enabled) { if (num > 0) diff --git a/lib/thread.h b/lib/thread.h index cd5cf16d55..c8c8c19323 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -52,6 +52,27 @@ struct pqueue; */ typedef fd_set thread_fd_set; +#if defined(HAVE_POLL) +#include +struct fd_handler +{ + /* number of pfd stored in pfds */ + nfds_t pfdcount; + /* number of pfd stored in pfds + number of snmp pfd */ + nfds_t pfdcountsnmp; + /* number of pfd that fit in the allocated space of pfds */ + nfds_t pfdsize; + struct pollfd *pfds; +}; +#else +struct fd_handler +{ + fd_set readfd; + fd_set writefd; + fd_set exceptfd; +}; +#endif + /* Master of the theads. */ struct thread_master { @@ -63,9 +84,7 @@ struct thread_master struct thread_list unuse; struct pqueue *background; int fd_limit; - thread_fd_set readfd; - thread_fd_set writefd; - thread_fd_set exceptfd; + struct fd_handler handler; unsigned long alloc; }; -- 2.39.5