summaryrefslogtreecommitdiff
path: root/lib/stream.c
diff options
context:
space:
mode:
authorQuentin Young <qlyoung@cumulusnetworks.com>2018-04-21 16:51:54 -0400
committerQuentin Young <qlyoung@cumulusnetworks.com>2018-05-07 11:37:07 -0400
commit363e24c65140d07c2ced580698bd1427c6b99a55 (patch)
tree9fa78138b63b883535e8b1a8f099d7fd721eb17a /lib/stream.c
parente8f95403e4b4d55c605ef49a0d772f30834134a4 (diff)
lib: add mt-safe variants for stream_fifo ops
stream_fifo is used as our standard internal message queue. Message queues are useful in multithreaded environments. Up until now I have been doing my own synchronization when using stream_fifo in this way; this patch gets rid of the need for that boilerplate and decreases the risk of locking mistakes when working with this datastructure. Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'lib/stream.c')
-rw-r--r--lib/stream.c60
1 files changed, 56 insertions, 4 deletions
diff --git a/lib/stream.c b/lib/stream.c
index c4edd3d5bf..aba4c20166 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -21,6 +21,7 @@
#include <zebra.h>
#include <stddef.h>
+#include <pthread.h>
#include "stream.h"
#include "memory.h"
@@ -1101,6 +1102,7 @@ struct stream_fifo *stream_fifo_new(void)
struct stream_fifo *new;
new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
+ pthread_mutex_init(&new->mtx, NULL);
return new;
}
@@ -1115,7 +1117,16 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
fifo->tail = s;
fifo->tail->next = NULL;
- fifo->count++;
+ atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+}
+
+void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
+{
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ stream_fifo_push(fifo, s);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
}
/* Delete first stream from fifo. */
@@ -1131,7 +1142,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
if (fifo->head == NULL)
fifo->tail = NULL;
- fifo->count--;
+ atomic_fetch_sub_explicit(&fifo->count, 1,
+ memory_order_release);
/* ensure stream is scrubbed of references to this fifo */
s->next = NULL;
@@ -1140,12 +1152,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
return s;
}
-/* Return first fifo entry. */
+struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
+{
+ struct stream *ret;
+
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ ret = stream_fifo_pop(fifo);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
+
+ return ret;
+}
+
struct stream *stream_fifo_head(struct stream_fifo *fifo)
{
return fifo->head;
}
+struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
+{
+ struct stream *ret;
+
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ ret = stream_fifo_head(fifo);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
+
+ return ret;
+}
+
void stream_fifo_clean(struct stream_fifo *fifo)
{
struct stream *s;
@@ -1156,11 +1193,26 @@ void stream_fifo_clean(struct stream_fifo *fifo)
stream_free(s);
}
fifo->head = fifo->tail = NULL;
- fifo->count = 0;
+ atomic_store_explicit(&fifo->count, 0, memory_order_release);
+}
+
+void stream_fifo_clean_safe(struct stream_fifo *fifo)
+{
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ stream_fifo_clean(fifo);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
+}
+
+size_t stream_fifo_count_safe(struct stream_fifo *fifo)
+{
+ return atomic_load_explicit(&fifo->count, memory_order_acquire);
}
void stream_fifo_free(struct stream_fifo *fifo)
{
stream_fifo_clean(fifo);
+ pthread_mutex_destroy(&fifo->mtx);
XFREE(MTYPE_STREAM_FIFO, fifo);
}