summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/lib_errors.c6
-rw-r--r--lib/lib_errors.h1
-rw-r--r--lib/thread.c83
3 files changed, 74 insertions, 16 deletions
diff --git a/lib/lib_errors.c b/lib/lib_errors.c
index b6c764d873..e0559f332d 100644
--- a/lib/lib_errors.c
+++ b/lib/lib_errors.c
@@ -51,6 +51,12 @@ static struct log_ref ferr_lib_warn[] = {
.suggestion = "Gather log data and open an Issue",
},
{
+ .code = EC_LIB_NO_THREAD,
+ .title = "The Event subsystem has detected an internal FD problem",
+ .description = "The Event subsystem has detected a file descriptor read/write event without an associated handling function. This is a bug, please collect log data and open an issue.",
+ .suggestion = "Gather log data and open an Issue",
+ },
+ {
.code = EC_LIB_RMAP_RECURSION_LIMIT,
.title = "Reached the Route-Map Recursion Limit",
.description = "The Route-Map subsystem has detected a route-map depth of RMAP_RECURSION_LIMIT and has stopped processing",
diff --git a/lib/lib_errors.h b/lib/lib_errors.h
index 39b39fb065..996a16ba95 100644
--- a/lib/lib_errors.h
+++ b/lib/lib_errors.h
@@ -45,6 +45,7 @@ enum lib_log_refs {
EC_LIB_STREAM,
EC_LIB_LINUX_NS,
EC_LIB_SLOW_THREAD,
+ EC_LIB_NO_THREAD,
EC_LIB_RMAP_RECURSION_LIMIT,
EC_LIB_BACKUP_CONFIG,
EC_LIB_VRF_LENGTH,
diff --git a/lib/thread.c b/lib/thread.c
index 7489be5c2d..fc2de09df0 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -307,6 +307,7 @@ static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
{
const char *name = m->name ? m->name : "main";
char underline[strlen(name) + 1];
+ struct thread *thread;
uint32_t i;
memset(underline, '-', sizeof(underline));
@@ -316,11 +317,31 @@ static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
vty_out(vty, "----------------------%s\n", underline);
vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
m->fd_limit);
- for (i = 0; i < m->handler.pfdcount; i++)
- vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i,
- m->handler.pfds[i].fd,
- m->handler.pfds[i].events,
+ for (i = 0; i < m->handler.pfdcount; i++) {
+ vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
+ m->handler.pfds[i].fd, m->handler.pfds[i].events,
m->handler.pfds[i].revents);
+
+ if (m->handler.pfds[i].events & POLLIN) {
+ thread = m->read[m->handler.pfds[i].fd];
+
+ if (!thread)
+ vty_out(vty, "ERROR ");
+ else
+ vty_out(vty, "%s ", thread->funcname);
+ } else
+ vty_out(vty, " ");
+
+ if (m->handler.pfds[i].events & POLLOUT) {
+ thread = m->write[m->handler.pfds[i].fd];
+
+ if (!thread)
+ vty_out(vty, "ERROR\n");
+ else
+ vty_out(vty, "%s\n", thread->funcname);
+ } else
+ vty_out(vty, "\n");
+ }
}
DEFUN (show_thread_poll,
@@ -756,6 +777,7 @@ struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
debugargdef)
{
struct thread *thread = NULL;
+ struct thread **thread_array;
assert(fd >= 0 && fd < m->fd_limit);
pthread_mutex_lock(&m->mtx);
@@ -770,11 +792,25 @@ struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
/* default to a new pollfd */
nfds_t queuepos = m->handler.pfdcount;
+ if (dir == THREAD_READ)
+ thread_array = m->read;
+ else
+ thread_array = m->write;
+
/* if we already have a pollfd for our file descriptor, find and
* use it */
for (nfds_t i = 0; i < m->handler.pfdcount; i++)
if (m->handler.pfds[i].fd == fd) {
queuepos = i;
+
+#ifdef DEV_BUILD
+ /*
+ * What happens if we have a thread already
+ * created for this event?
+ */
+ if (thread_array[fd])
+ assert(!"Thread already scheduled for file descriptor");
+#endif
break;
}
@@ -794,10 +830,7 @@ struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m,
pthread_mutex_lock(&thread->mtx);
{
thread->u.fd = fd;
- if (dir == THREAD_READ)
- m->read[thread->u.fd] = thread;
- else
- m->write[thread->u.fd] = thread;
+ thread_array[thread->u.fd] = thread;
}
pthread_mutex_unlock(&thread->mtx);
@@ -1238,12 +1271,31 @@ static struct thread *thread_run(struct thread_master *m, struct thread *thread,
}
static int thread_process_io_helper(struct thread_master *m,
- struct thread *thread, short state, int pos)
+ struct thread *thread, short state,
+ short actual_state, int pos)
{
struct thread **thread_array;
- if (!thread)
+ /*
+ * poll() clears the .events field, but the pollfd array we
+ * pass to poll() is a copy of the one used to schedule threads.
+ * We need to synchronize state between the two here by applying
+ * the same changes poll() made on the copy of the "real" pollfd
+ * array.
+ *
+ * This cleans up a possible infinite loop where we refuse
+ * to respond to a poll event but poll is insistent that
+ * we should.
+ */
+ m->handler.pfds[pos].events &= ~(state);
+
+ if (!thread) {
+ if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
+ flog_err(EC_LIB_NO_THREAD,
+ "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n",
+ m->handler.pfds[pos].fd, actual_state);
return 0;
+ }
if (thread->type == THREAD_READ)
thread_array = m->read;
@@ -1253,9 +1305,7 @@ static int thread_process_io_helper(struct thread_master *m,
thread_array[thread->u.fd] = NULL;
thread_list_add_tail(&m->ready, thread);
thread->type = THREAD_READY;
- /* if another pthread scheduled this file descriptor for the event we're
- * responding to, no problem; we're getting to it now */
- thread->master->handler.pfds[pos].events &= ~(state);
+
return 1;
}
@@ -1291,12 +1341,13 @@ static void thread_process_io(struct thread_master *m, unsigned int num)
* there's no need to update it. Similarily, barring deletion,
* the fd
* should still be a valid index into the master's pfds. */
- if (pfds[i].revents & (POLLIN | POLLHUP))
+ if (pfds[i].revents & (POLLIN | POLLHUP)) {
thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
- i);
+ pfds[i].revents, i);
+ }
if (pfds[i].revents & POLLOUT)
thread_process_io_helper(m, m->write[pfds[i].fd],
- POLLOUT, i);
+ POLLOUT, pfds[i].revents, i);
/* if one of our file descriptors is garbage, remove the same
* from