]> git.puffer.fish Git - matthieu/frr.git/commitdiff
lib: add mt-safe variants for stream_fifo ops
authorQuentin Young <qlyoung@cumulusnetworks.com>
Sat, 21 Apr 2018 20:51:54 +0000 (16:51 -0400)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Mon, 7 May 2018 15:37:07 +0000 (11:37 -0400)
stream_fifo is used as our standard internal message queue. Message
queues are useful in multithreaded environments. Up until now I have
been doing my own synchronization when using stream_fifo in this way;
this patch gets rid of the need for that boilerplate and decreases the
risk of locking mistakes when working with this datastructure.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
lib/stream.c
lib/stream.h

index c4edd3d5bff3e622f8fc35a01c67ad7e0389b0cb..aba4c20166175e27049d126def18c4315e95e25e 100644 (file)
@@ -21,6 +21,7 @@
 
 #include <zebra.h>
 #include <stddef.h>
+#include <pthread.h>
 
 #include "stream.h"
 #include "memory.h"
@@ -1101,6 +1102,7 @@ struct stream_fifo *stream_fifo_new(void)
        struct stream_fifo *new;
 
        new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
+       pthread_mutex_init(&new->mtx, NULL);
        return new;
 }
 
@@ -1115,7 +1117,16 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
        fifo->tail = s;
        fifo->tail->next = NULL;
 
-       fifo->count++;
+       atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+}
+
+void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_push(fifo, s);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
 }
 
 /* Delete first stream from fifo. */
@@ -1131,7 +1142,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
                if (fifo->head == NULL)
                        fifo->tail = NULL;
 
-               fifo->count--;
+               atomic_fetch_sub_explicit(&fifo->count, 1,
+                                         memory_order_release);
 
                /* ensure stream is scrubbed of references to this fifo */
                s->next = NULL;
@@ -1140,12 +1152,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
        return s;
 }
 
-/* Return first fifo entry. */
+struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_pop(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 struct stream *stream_fifo_head(struct stream_fifo *fifo)
 {
        return fifo->head;
 }
 
+struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_head(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 void stream_fifo_clean(struct stream_fifo *fifo)
 {
        struct stream *s;
@@ -1156,11 +1193,26 @@ void stream_fifo_clean(struct stream_fifo *fifo)
                stream_free(s);
        }
        fifo->head = fifo->tail = NULL;
-       fifo->count = 0;
+       atomic_store_explicit(&fifo->count, 0, memory_order_release);
+}
+
+void stream_fifo_clean_safe(struct stream_fifo *fifo)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_clean(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+}
+
+size_t stream_fifo_count_safe(struct stream_fifo *fifo)
+{
+       return atomic_load_explicit(&fifo->count, memory_order_acquire);
 }
 
 void stream_fifo_free(struct stream_fifo *fifo)
 {
        stream_fifo_clean(fifo);
+       pthread_mutex_destroy(&fifo->mtx);
        XFREE(MTYPE_STREAM_FIFO, fifo);
 }
index cc74e22a6723a7187cfd05a667660d906957943a..e5d325e43ed4940dd65f421d6152e60d892a9d81 100644 (file)
@@ -22,6 +22,9 @@
 #ifndef _ZEBRA_STREAM_H
 #define _ZEBRA_STREAM_H
 
+#include <pthread.h>
+
+#include "frratomic.h"
 #include "mpls.h"
 #include "prefix.h"
 
@@ -107,7 +110,11 @@ struct stream {
 
 /* First in first out queue structure. */
 struct stream_fifo {
-       size_t count;
+       /* lock for mt-safe operations */
+       pthread_mutex_t mtx;
+
+       /* number of streams in this fifo */
+       _Atomic size_t count;
 
        struct stream *head;
        struct stream *tail;
@@ -240,12 +247,94 @@ extern int stream_empty(struct stream *); /* is the stream empty? */
 /* deprecated */
 extern uint8_t *stream_pnt(struct stream *);
 
-/* Stream fifo. */
+/*
+ * Operations on struct stream_fifo.
+ *
+ * Each function has a safe variant, which ensures that the operation performed
+ * is atomic with respect to the operations performed by all other safe
+ * variants. In other words, the safe variants lock the stream_fifo's mutex
+ * before performing their action. These are provided for convenience when
+ * using stream_fifo in a multithreaded context, to alleviate the need for the
+ * caller to implement their own synchronization around the stream_fifo.
+ *
+ * The following functions do not have safe variants. The caller must ensure
+ * that these operations are performed safely in a multithreaded context:
+ * - stream_fifo_new
+ * - stream_fifo_free
+ */
+
+/*
+ * Create a new stream_fifo.
+ *
+ * Returns:
+ *    newly created stream_fifo
+ */
 extern struct stream_fifo *stream_fifo_new(void);
+
+/*
+ * Push a stream onto a stream_fifo.
+ *
+ * fifo
+ *    the stream_fifo to push onto
+ *
+ * s
+ *    the stream to push onto the stream_fifo
+ */
 extern void stream_fifo_push(struct stream_fifo *fifo, struct stream *s);
+extern void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s);
+
+/*
+ * Pop a stream off a stream_fifo.
+ *
+ * fifo
+ *    the stream_fifo to pop from
+ *
+ * Returns:
+ *    the next stream in the stream_fifo
+ */
 extern struct stream *stream_fifo_pop(struct stream_fifo *fifo);
+extern struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo);
+
+/*
+ * Retrieve the next stream from a stream_fifo without popping it.
+ *
+ * fifo
+ *    the stream_fifo to operate on
+ *
+ * Returns:
+ *    the next stream that would be returned from stream_fifo_pop
+ */
 extern struct stream *stream_fifo_head(struct stream_fifo *fifo);
+extern struct stream *stream_fifo_head_safe(struct stream_fifo *fifo);
+
+/*
+ * Remove all streams from a stream_fifo.
+ *
+ * fifo
+ *    the stream_fifo to clean
+ */
 extern void stream_fifo_clean(struct stream_fifo *fifo);
+extern void stream_fifo_clean_safe(struct stream_fifo *fifo);
+
+/*
+ * Retrieve number of streams on a stream_fifo.
+ *
+ * fifo
+ *    the stream_fifo to retrieve the count for
+ *
+ * Returns:
+ *    the number of streams on the stream_fifo
+ */
+extern size_t stream_fifo_count_safe(struct stream_fifo *fifo);
+
+/*
+ * Free a stream_fifo.
+ *
+ * Calls stream_fifo_clean, then deinitializes the stream_fifo and frees it.
+ *
+ * fifo
+ *    the stream_fifo to free
+ */
 extern void stream_fifo_free(struct stream_fifo *fifo);
 
 /* This is here because "<< 24" is particularly problematic in C.