summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/thread.c141
1 files changed, 57 insertions, 84 deletions
diff --git a/lib/thread.c b/lib/thread.c
index bb524f0ad0..8c5c7487e7 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -560,7 +560,7 @@ thread_master_create (void)
rv->timer->update = rv->background->update = thread_timer_update;
#if defined(HAVE_POLL)
- rv->handler.pfdsize = 64;
+ rv->handler.pfdsize = rv->fd_limit;
rv->handler.pfdcount = 0;
rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
@@ -791,53 +791,26 @@ thread_get (struct thread_master *m, u_char type,
return thread;
}
-#if defined (HAVE_POLL)
-
#define fd_copy_fd_set(X) (X)
-static short
-realloc_pfds (struct thread_master *m, int fd)
-{
- size_t oldpfdlen = m->handler.pfdsize * sizeof(struct pollfd);
- void *newpfd = NULL;
-
- m->handler.pfdsize *= 2;
- newpfd = XREALLOC (MTYPE_THREAD, m->handler.pfds, m->handler.pfdsize * sizeof(struct pollfd));
- if (newpfd == NULL)
- {
- close(fd);
- zlog (NULL, LOG_ERR, "failed to allocate space for pollfds");
- return 0;
- }
- memset((struct pollfd*)newpfd + (m->handler.pfdsize / 2), 0, oldpfdlen);
- m->handler.pfds = (struct pollfd*)newpfd;
- return 1;
-}
-
/* generic add thread function */
static struct thread *
-generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
- void *arg, int fd, const char* funcname, int dir)
+generic_thread_add (struct thread_master *m, int (*func) (struct thread *),
+ void *arg, int fd, const char* funcname, int dir)
{
struct thread *thread;
- u_char type;
+#if defined (HAVE_POLL)
short int event;
if (dir == THREAD_READ)
- {
- event = (POLLIN | POLLHUP);
- type = THREAD_READ;
- }
+ event = (POLLIN | POLLHUP);
else
- {
- event = (POLLOUT | POLLHUP);
- type = THREAD_WRITE;
- }
+ event = (POLLOUT | POLLHUP);
nfds_t queuepos = m->handler.pfdcount;
nfds_t i=0;
- for (i=0; i<m->handler.pfdcount; i++)
+ for (i=0; i < m->handler.pfdcount; i++)
if (m->handler.pfds[i].fd == fd)
{
queuepos = i;
@@ -845,23 +818,31 @@ generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
}
/* is there enough space for a new fd? */
- if (queuepos >= m->handler.pfdsize)
- if (realloc_pfds(m, fd) == 0)
- return NULL;
+ assert (queuepos < m->handler.pfdsize);
- thread = thread_get (m, type, func, arg, funcname);
m->handler.pfds[queuepos].fd = fd;
m->handler.pfds[queuepos].events |= event;
if (queuepos == m->handler.pfdcount)
m->handler.pfdcount++;
-
- return thread;
-}
#else
+ if (dir == THREAD_READ)
+ fdset = &m->handler.readfd;
+ else
+ fdset = &m->handler.writefd;
-#define fd_copy_fd_set(X) (X)
+ if (FD_ISSET (fd, fdset))
+ {
+ zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
+ return NULL;
+ }
+
+ FD_SET (fd, fdset);
#endif
+ thread = thread_get (m, dir, func, arg, funcname);
+ return thread;
+}
+
static int
fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
{
@@ -896,6 +877,7 @@ fd_clear_read_write (struct thread *thread)
{
#if !defined(HAVE_POLL)
thread_fd_set *fdset = NULL;
+
int fd = THREAD_FD (thread);
if (thread->type == THREAD_READ)
@@ -918,29 +900,7 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
{
struct thread *thread = NULL;
-#if !defined(HAVE_POLL)
- thread_fd_set *fdset = NULL;
- if (dir == THREAD_READ)
- fdset = &m->handler.readfd;
- else
- fdset = &m->handler.writefd;
-#endif
-
-#if defined (HAVE_POLL)
- thread = generic_thread_add(m, func, arg, fd, funcname, dir);
-
- if (thread == NULL)
- return NULL;
-#else
- if (FD_ISSET (fd, fdset))
- {
- zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
- return NULL;
- }
-
- FD_SET (fd, fdset);
- thread = thread_get (m, dir, func, arg, funcname);
-#endif
+ thread = generic_thread_add (m, func, arg, fd, funcname, dir);
thread->u.fd = fd;
if (dir == THREAD_READ)
@@ -1244,9 +1204,7 @@ add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
{
if (FD_ISSET(i, snmpfds))
{
- if (m->handler.pfdcountsnmp > m->handler.pfdsize)
- if (realloc_pfds(m, i) < 0)
- return;
+ assert (m->handler.pfdcountsnmp <= m->handler.pfdsize);
m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
@@ -1262,7 +1220,9 @@ check_pollfds(struct thread_master *m, fd_set *readfd, int num)
{
nfds_t i = 0;
int ready = 0;
- for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
+ int handled = 0;
+
+ for (i = 0; i < m->handler.pfdcount && handled < num ; ++i)
{
/* no event for current fd? immideatly continue */
if(m->handler.pfds[i].revents == 0)
@@ -1271,32 +1231,45 @@ check_pollfds(struct thread_master *m, fd_set *readfd, int num)
/* remove fd from list on POLLNVAL */
if (m->handler.pfds[i].revents & POLLNVAL)
{
- memmove(m->handler.pfds+i,
- m->handler.pfds+i+1,
- (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
- m->handler.pfdcount--;
- i--;
- continue;
+ memmove (m->handler.pfds+i,
+ m->handler.pfds+i+1,
+ (m->handler.pfdsize-i-1) * sizeof (struct pollfd));
+ m->handler.pfdcount--;
+ i--;
+ continue;
}
/* POLLIN / POLLOUT process event */
if (m->handler.pfds[i].revents & POLLIN)
- ready += thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
+ ready += thread_process_fds_helper (m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
if (m->handler.pfds[i].revents & POLLOUT)
- ready += thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
+ ready += thread_process_fds_helper (m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
/* remove fd from list on POLLHUP after other event is processed */
if (m->handler.pfds[i].revents & POLLHUP)
{
- memmove(m->handler.pfds+i,
- m->handler.pfds+i+1,
- (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
- m->handler.pfdcount--;
- i--;
- ready++;
+ memmove (m->handler.pfds+i,
+ m->handler.pfds+i+1,
+ (m->handler.pfdsize-i-1) * sizeof (struct pollfd));
+ m->handler.pfdcount--;
+ i--;
+ ready++;
}
else
- m->handler.pfds[i].revents = 0;
+ m->handler.pfds[i].revents = 0;
+
+ /*
+ * WTF? We get 1 event per revents being non 0
+ * But we could possibly be counting up multiple
+ * times for each revents handled. So
+ * note that we've handled any revents and
+ * make it count once;
+ */
+ if (ready)
+ {
+ handled++;
+ ready = 0;
+ }
}
}
#endif