static struct hash *cpu_record = NULL;
-
/* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO).
And change negative values to 0. */
static struct timeval
/* Allocate new thread master. */
struct thread_master *
-thread_master_create ()
+thread_master_create (void)
{
struct thread_master *rv;
struct rlimit limit;
rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
rv->timer->update = rv->background->update = thread_timer_update;
+#if defined(HAVE_POLL)
+ rv->handler.pfdsize = 64;
+ rv->handler.pfdcount = 0;
+ rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
+ memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
+#endif
return rv;
}
thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse);
thread_queue_free (m, m->background);
-
+
+#if defined(HAVE_POLL)
+ XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
+#endif
XFREE (MTYPE_THREAD_MASTER, m);
if (cpu_record)
return thread;
}
+#if defined (HAVE_POLL)
+
+#define fd_copy_fd_set(X) (X)
+
+static short
+realloc_pfds (struct thread_master *m, int fd)
+{
+ size_t oldpfdlen = m->handler.pfdsize * sizeof(struct pollfd);
+ void *newpfd = NULL;
+
+ m->handler.pfdsize *= 2;
+ newpfd = XREALLOC (MTYPE_THREAD, m->handler.pfds, m->handler.pfdsize * sizeof(struct pollfd));
+ if (newpfd == NULL)
+ {
+ close(fd);
+ zlog (NULL, LOG_ERR, "failed to allocate space for pollfds");
+ return 0;
+ }
+ memset((struct pollfd*)newpfd + (m->handler.pfdsize / 2), 0, oldpfdlen);
+ m->handler.pfds = (struct pollfd*)newpfd;
+ return 1;
+}
+
+/* generic add thread function */
+static struct thread *
+generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
+ void *arg, int fd, const char* funcname, int dir)
+{
+ struct thread *thread;
+
+ u_char type;
+ short int event;
+
+ if (dir == THREAD_READ)
+ {
+ event = (POLLIN | POLLHUP);
+ type = THREAD_READ;
+ }
+ else
+ {
+ event = (POLLOUT | POLLHUP);
+ type = THREAD_WRITE;
+ }
+
+ nfds_t queuepos = m->handler.pfdcount;
+ nfds_t i=0;
+ for (i=0; i<m->handler.pfdcount; i++)
+ if (m->handler.pfds[i].fd == fd)
+ {
+ queuepos = i;
+ break;
+ }
+
+ /* is there enough space for a new fd? */
+ if (queuepos >= m->handler.pfdsize)
+ if (realloc_pfds(m, fd) == 0)
+ return NULL;
+
+ thread = thread_get (m, type, func, arg, funcname);
+ m->handler.pfds[queuepos].fd = fd;
+ m->handler.pfds[queuepos].events |= event;
+ if (queuepos == m->handler.pfdcount)
+ m->handler.pfdcount++;
+
+ return thread;
+}
+#else
+
#define fd_copy_fd_set(X) (X)
+#endif
static int
-fd_select (int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *t)
+fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
{
- return(select(size, read, write, except, t));
+ int num;
+#if defined(HAVE_POLL)
+ /* recalc timeout for poll. Attention NULL pointer is no timeout with
+ select, where with poll no timeount is -1 */
+ int timeout = -1;
+ if (timer_wait != NULL)
+ timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
+
+ num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
+#else
+ num = select (size, read, write, except, timer_wait);
+#endif
+
+ return num;
}
static int
-fd_is_set (int fd, thread_fd_set *fdset)
+fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
{
- return FD_ISSET (fd, fdset);
+#if defined(HAVE_POLL)
+ return 1;
+#else
+ return FD_ISSET (THREAD_FD (thread), fdset);
+#endif
}
static int
-fd_clear_read_write (int fd, thread_fd_set *fdset)
+fd_clear_read_write (struct thread *thread)
{
+#if !defined(HAVE_POLL)
+ thread_fd_set *fdset = NULL;
+ int fd = THREAD_FD (thread);
+
+ if (thread->type == THREAD_READ)
+ fdset = &thread->master->handler.readfd;
+ else
+ fdset = &thread->master->handler.writefd;
+
if (!FD_ISSET (fd, fdset))
return 0;
FD_CLR (fd, fdset);
+#endif
return 1;
}
int (*func) (struct thread *), void *arg, int fd, const char* funcname)
{
struct thread *thread = NULL;
- thread_fd_set *fdset = NULL;
+#if !defined(HAVE_POLL)
+ thread_fd_set *fdset = NULL;
if (dir == THREAD_READ)
- fdset = &m->readfd;
+ fdset = &m->handler.readfd;
else
- fdset = &m->writefd;
+ fdset = &m->handler.writefd;
+#endif
+#if defined (HAVE_POLL)
+ thread = generic_thread_add(m, func, arg, fd, funcname, dir);
+
+ if (thread == NULL)
+ return NULL;
+#else
if (FD_ISSET (fd, fdset))
{
zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
}
FD_SET (fd, fdset);
-
thread = thread_get (m, dir, func, arg, funcname);
+#endif
+
thread->u.fd = fd;
if (dir == THREAD_READ)
thread_add_fd (m->read, thread);
return thread;
}
+static void
+thread_cancel_read_write (struct thread *thread)
+{
+#if defined(HAVE_POLL)
+ nfds_t i;
+
+ for (i=0;i<thread->master->handler.pfdcount;++i)
+ if (thread->master->handler.pfds[i].fd == thread->u.fd)
+ {
+ /* remove thread fds from pfd list */
+ memmove(thread->master->handler.pfds+i,
+ thread->master->handler.pfds+i+1,
+ (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
+ i--;
+ thread->master->handler.pfdcount--;
+ }
+#endif
+
+ fd_clear_read_write (thread);
+}
+
/* Cancel thread from scheduler. */
void
thread_cancel (struct thread *thread)
switch (thread->type)
{
case THREAD_READ:
- assert (fd_clear_read_write (thread->u.fd, &thread->master->readfd));
+ thread_cancel_read_write (thread);
thread_array = thread->master->read;
break;
case THREAD_WRITE:
- assert (fd_clear_read_write (thread->u.fd, &thread->master->writefd));
+ thread_cancel_read_write (thread);
thread_array = thread->master->write;
break;
case THREAD_TIMER:
}
static int
-thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset)
+thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
{
- thread_fd_set *mfdset = NULL;
struct thread **thread_array;
if (!thread)
return 0;
if (thread->type == THREAD_READ)
- {
- mfdset = &m->readfd;
- thread_array = m->read;
- }
+ thread_array = m->read;
else
- {
- mfdset = &m->writefd;
- thread_array = m->write;
- }
+ thread_array = m->write;
- if (fd_is_set (THREAD_FD (thread), fdset))
+ if (fd_is_set (thread, fdset, pos))
{
- fd_clear_read_write (THREAD_FD (thread), mfdset);
+ fd_clear_read_write (thread);
thread_delete_fd (thread_array, thread);
thread_list_add (&m->ready, thread);
thread->type = THREAD_READY;
+#if defined(HAVE_POLL)
+ thread->master->handler.pfds[pos].events &= ~(state);
+#endif
return 1;
}
return 0;
}
-static int
+#if defined(HAVE_POLL)
+
+#if defined(HAVE_SNMP)
+/* add snmp fds to poll set */
+static void
+add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
+{
+ int i;
+ m->handler.pfdcountsnmp = m->handler.pfdcount;
+ /* cycle trough fds and add neccessary fds to poll set */
+ for (i=0;i<fdsetsize;++i)
+ {
+ if (FD_ISSET(i, snmpfds))
+ {
+ if (m->handler.pfdcountsnmp > m->handler.pfdsize)
+ if (realloc_pfds(m, i) < 0)
+ return;
+
+ m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
+ m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
+ m->handler.pfdcountsnmp++;
+ }
+ }
+}
+#endif
+
+/* check poll events */
+static void
+check_pollfds(struct thread_master *m, fd_set *readfd, int num)
+{
+ nfds_t i = 0;
+ int ready = 0;
+ for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
+ {
+ /* no event for current fd? immideatly continue */
+ if(m->handler.pfds[i].revents == 0)
+ continue;
+
+ /* remove fd from list on POLLNVAL */
+ if (m->handler.pfds[i].revents & POLLNVAL)
+ {
+ memmove(m->handler.pfds+i,
+ m->handler.pfds+i+1,
+ (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
+ m->handler.pfdcount--;
+ i--;
+ continue;
+ }
+
+ /* POLLIN / POLLOUT process event */
+ if (m->handler.pfds[i].revents & POLLIN)
+ ready += thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
+ if (m->handler.pfds[i].revents & POLLOUT)
+ ready += thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
+
+ /* remove fd from list on POLLHUP after other event is processed */
+ if (m->handler.pfds[i].revents & POLLHUP)
+ {
+ memmove(m->handler.pfds+i,
+ m->handler.pfds+i+1,
+ (m->handler.pfdsize-i-1) * sizeof(struct pollfd));
+ m->handler.pfdcount--;
+ i--;
+ ready++;
+ }
+ else
+ m->handler.pfds[i].revents = 0;
+ }
+}
+#endif
+
+static void
thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
{
+#if defined (HAVE_POLL)
+ check_pollfds (m, rset, num);
+#else
int ready = 0, index;
for (index = 0; index < m->fd_limit && ready < num; ++index)
{
- ready += thread_process_fds_helper (m, m->read[index], rset);
- ready += thread_process_fds_helper (m, m->write[index], wset);
+ ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
+ ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
}
- return num - ready;
+#endif
}
/* Add all timers that have popped to the ready list. */
thread_process (&m->event);
/* Structure copy. */
- readfd = fd_copy_fd_set(m->readfd);
- writefd = fd_copy_fd_set(m->writefd);
- exceptfd = fd_copy_fd_set(m->exceptfd);
+#if !defined(HAVE_POLL)
+ readfd = fd_copy_fd_set(m->handler.readfd);
+ writefd = fd_copy_fd_set(m->handler.writefd);
+ exceptfd = fd_copy_fd_set(m->handler.exceptfd);
+#endif
/* Calculate select wait timer if nothing else to do */
if (m->ready.count == 0)
snmpblock = 0;
memcpy(&snmp_timer_wait, timer_wait, sizeof(struct timeval));
}
+#if defined(HAVE_POLL)
+ /* clear fdset since there are no other fds in fd_set,
+ then add injected fds from snmp_select_info into pollset */
+ FD_ZERO(&readfd);
+#endif
snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock);
+#if defined(HAVE_POLL)
+ add_snmp_pollfds(m, &readfd, fdsetsize);
+#endif
if (snmpblock == 0)
timer_wait = &snmp_timer_wait;
}
#endif
- num = fd_select (FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
+ num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
/* Signals should get quick treatment */
if (num < 0)
}
#if defined HAVE_SNMP && defined SNMP_AGENTX
+#if defined(HAVE_POLL)
+ /* re-enter pollfds in fd_set for handling in snmp_read */
+ FD_ZERO(&readfd);
+ nfds_t i;
+ for (i = m->handler.pfdcount; i < m->handler.pfdcountsnmp; ++i)
+ {
+ if (m->handler.pfds[i].revents == POLLIN)
+ FD_SET(m->handler.pfds[i].fd, &readfd);
+ }
+#endif
if (agentx_enabled)
{
if (num > 0)